Merge branch 'slf/chain-repair'

This commit is contained in:
Scott Lystig Fritchie 2015-05-17 20:56:36 +09:00
commit 7919ff6c6e
17 changed files with 1712 additions and 280 deletions

View file

@ -31,20 +31,24 @@ func, and pattern match Erlang style in that func.
*** DONE Preserve current test code (leave as-is? tiny changes?)
*** DONE Make chain manager code flexible enough to run "real world" or "sim"
** DONE Add projection wedging logic to each FLU.
** Started.... Implement real data repair, orchestrated by the chain manager
** TODO Change all protocol ops to enforce the epoch ID
** DONE Implement real data repair, orchestrated by the chain manager
** DONE Change all protocol ops to enforce the epoch ID
- Add no-wedging state to make testing easier?
** TODO Move the FLU server to gen_server behavior?
** TODO Adapt the projection-aware, CR-implementing client from demo-day
** TODO Create parallel PULSE test for basic API plus chain manager repair
** TODO Add gproc and get rid of registered name rendezvous
*** TODO Fixes the atom table leak
*** TODO Fixes the problem of having active sequencer for the same prefix
on two FLUS in the same VM
** TODO Fix all known bugs with Chain Manager
** TODO Fix all known bugs with Chain Manager (list below)
*** DONE Fix known bugs
*** DONE Clean up crufty TODO comments and other obvious cruft
*** TODO Re-add verification step of stable epochs, including inner projections!
*** TODO Attempt to remove cruft items in flapping_i?
** TODO Move the FLU server to gen_server behavior?

View file

@ -24,19 +24,3 @@
-type projection() :: #projection_v1{}.
-record(ch_mgr, {
name :: pv1_server(),
flap_limit :: non_neg_integer(),
proj :: projection(),
%%
timer :: 'undefined' | timer:tref(),
ignore_timer :: boolean(),
proj_history :: queue:queue(),
flaps=0 :: integer(),
flap_start = ?NOT_FLAPPING
:: erlang:timestamp(),
runenv :: list(), %proplist()
opts :: list(), %proplist()
members_dict :: p_srvr_dict(),
proxies_dict :: orddict:orddict()
}).

View file

@ -29,7 +29,7 @@
-record(p_srvr, {
name :: pv1_server(),
proto = 'ipv4' :: 'ipv4' | 'disterl', % disterl? Hrm.
proto_mod = 'machi_flu1_client' :: atom(), % Module name
address :: term(), % Protocol-specific
port :: term(), % Protocol-specific
props = [] :: list() % proplist for other related info
@ -58,4 +58,8 @@
-define(SHA_MAX, (1 bsl (20*8))).
%% Set a limit to the maximum chain length, so that it's easier to
%% create a consistent projection ranking score.
-define(MAX_CHAIN_LENGTH, 64).
-endif. % !MACHI_PROJECTION_HRL

View file

@ -46,11 +46,11 @@ verify_file_checksums_local(Sock1, EpochID, Path) when is_port(Sock1) ->
machi_flu1_client:epoch_id(), binary()|list()) ->
{ok, [tuple()]} | {error, term()}.
verify_file_checksums_local(Host, TcpPort, EpochID, Path) ->
Sock1 = machi_util:connect(Host, TcpPort),
Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}),
try
verify_file_checksums_local2(Sock1, EpochID, Path)
after
catch gen_tcp:close(Sock1)
catch ?FLU_C:disconnect(Sock1)
end.
-spec verify_file_checksums_remote(port(), machi_flu1_client:epoch_id(), binary()|list()) ->
@ -62,11 +62,11 @@ verify_file_checksums_remote(Sock1, EpochID, File) when is_port(Sock1) ->
machi_flu1_client:epoch_id(), binary()|list()) ->
{ok, [tuple()]} | {error, term()}.
verify_file_checksums_remote(Host, TcpPort, EpochID, File) ->
Sock1 = machi_util:connect(Host, TcpPort),
Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}),
try
verify_file_checksums_remote2(Sock1, EpochID, File)
after
catch gen_tcp:close(Sock1)
catch ?FLU_C:disconnect(Sock1)
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%

View file

@ -54,16 +54,43 @@
-include("machi_projection.hrl").
-include("machi_chain_manager.hrl").
-record(ch_mgr, {
name :: pv1_server(),
flap_limit :: non_neg_integer(),
proj :: projection(),
%%
timer :: 'undefined' | timer:tref(),
ignore_timer :: boolean(),
proj_history :: queue:queue(),
flaps=0 :: integer(),
flap_start = ?NOT_FLAPPING
:: erlang:timestamp(),
repair_worker :: 'undefined' | pid(),
repair_start :: 'undefined' | erlang:timestamp(),
repair_final_status :: 'undefined' | term(),
runenv :: list(), %proplist()
opts :: list(), %proplist()
members_dict :: p_srvr_dict(),
proxies_dict :: orddict:orddict()
}).
-define(D(X), io:format(user, "~s ~p\n", [??X, X])).
-define(Dw(X), io:format(user, "~s ~w\n", [??X, X])).
-define(FLU_C, machi_flu1_client).
-define(FLU_PC, machi_proxy_flu1_client).
-define(TO, (2*1000)). % default timeout
%% Keep a history of our flowchart execution in the process dictionary.
-define(REACT(T), put(react, [T|get(react)])).
%% Define the period of private projection stability before we'll
%% start repair.
-ifdef(TEST).
-define(REPAIR_START_STABILITY_TIME, 3).
-else. % TEST
-define(REPAIR_START_STABILITY_TIME, 10).
-endif. % TEST
%% API
-export([start_link/2, start_link/3, stop/1, ping/1,
set_chain_members/2, set_active/2]).
@ -162,6 +189,7 @@ test_react_to_env(Pid) ->
%% local projection store.
init({MyName, InitMembersDict, MgrOpts}) ->
random:seed(now()),
init_remember_partition_hack(),
ZeroAll_list = [P#p_srvr.name || {_,P} <- orddict:to_list(InitMembersDict)],
ZeroProj = make_none_projection(MyName, ZeroAll_list, InitMembersDict),
@ -223,7 +251,11 @@ handle_call({set_chain_members, MembersDict}, _From,
repairing=[],
down=NewDown,
members_dict=MembersDict}),
S3 = S2#ch_mgr{proj=NewProj},
%% Reset all flapping state.
NewProj2 = NewProj#projection_v1{dbg=replace(NewProj#projection_v1.dbg,
[make_flapping_i()])},
S3 = S2#ch_mgr{proj=NewProj2,
proj_history=queue:new()},
{_QQ, S4} = do_react_to_env(S3),
{reply, Reply, S4};
handle_call({set_active, Boolean}, _From, #ch_mgr{timer=TRef}=S) ->
@ -255,7 +287,7 @@ handle_call({test_read_latest_public_projection, ReadRepairP}, _From, S) ->
Res = {Perhaps, Val, ExtraInfo},
{reply, Res, S2};
handle_call({test_react_to_env}, _From, S) ->
{TODOtodo, S2} = do_react_to_env(S),
{TODOtodo, S2} = do_react_to_env(S),
{reply, TODOtodo, S2};
handle_call(_Call, _From, S) ->
{reply, whaaaaaaaaaa, S}.
@ -267,7 +299,9 @@ handle_cast(_Cast, S) ->
handle_info(tick_check_environment, #ch_mgr{ignore_timer=true}=S) ->
{noreply, S};
handle_info(tick_check_environment, S) ->
{{_Delta, Props, _Epoch}, S2} = do_react_to_env(S),
{{_Delta, Props, _Epoch}, S1} = do_react_to_env(S),
S2 = sanitize_repair_state(S1),
S3 = perhaps_start_repair(S2),
case proplists:get_value(throttle_seconds, Props) of
N when is_integer(N), N > 0 ->
%% We are flapping. Set ignore_timer=true and schedule a
@ -276,12 +310,17 @@ handle_info(tick_check_environment, S) ->
%% state C200 is ever implemented, then it should be
%% implemented via the test_react_to_env style.
erlang:send_after(N*1000, self(), stop_ignoring_timer),
{noreply, S#ch_mgr{ignore_timer=true}};
{noreply, S3#ch_mgr{ignore_timer=true}};
_ ->
{noreply, S2}
{noreply, S3}
end;
handle_info(stop_ignoring_timer, S) ->
{noreply, S#ch_mgr{ignore_timer=false}};
handle_info({'DOWN',_Ref,process,Worker,Res},
#ch_mgr{repair_worker=Worker}=S)->
{noreply, S#ch_mgr{ignore_timer=false,
repair_worker=undefined,
repair_final_status=Res}};
handle_info(Msg, S) ->
case get(todo_bummer) of undefined -> io:format("TODO: got ~p\n", [Msg]);
_ -> ok
@ -303,12 +342,18 @@ make_none_projection(MyName, All_list, MembersDict) ->
machi_projection:new(MyName, MembersDict, UPI_list, Down_list, [], []).
get_my_private_proj_boot_info(MgrOpts, DefaultDict, DefaultProj) ->
get_my_proj_boot_info(MgrOpts, DefaultDict, DefaultProj, private).
get_my_public_proj_boot_info(MgrOpts, DefaultDict, DefaultProj) ->
get_my_proj_boot_info(MgrOpts, DefaultDict, DefaultProj, public).
get_my_proj_boot_info(MgrOpts, DefaultDict, DefaultProj, ProjType) ->
case proplists:get_value(projection_store_registered_name, MgrOpts) of
undefined ->
{DefaultDict, DefaultProj};
Store ->
{ok, P} = machi_projection_store:read_latest_projection(Store,
private),
ProjType),
{P#projection_v1.members_dict, P}
end.
@ -327,8 +372,11 @@ store_zeroth_projection_maybe(ZeroProj, MgrOpts) ->
set_active_timer(#ch_mgr{name=MyName, members_dict=MembersDict}=S) ->
FLU_list = [P#p_srvr.name || {_,P} <- orddict:to_list(MembersDict)],
USec = calc_sleep_ranked_order(1000, 2000, MyName, FLU_list),
{ok, TRef} = timer:send_interval(USec, tick_check_environment),
%% Perturb the order a little bit, to avoid near-lock-step
%% operations every few ticks.
MSec = calc_sleep_ranked_order(400, 1500, MyName, FLU_list) +
random:uniform(100),
{ok, TRef} = timer:send_interval(MSec, tick_check_environment),
S#ch_mgr{timer=TRef}.
do_cl_write_public_proj(Proj, S) ->
@ -392,7 +440,7 @@ do_cl_read_latest_public_projection(ReadRepairP,
end.
read_latest_projection_call_only(ProjectionType, AllHosed,
#ch_mgr{proj=CurrentProj}=S) ->
#ch_mgr{proj=CurrentProj}=S) ->
#projection_v1{all_members=All_list} = CurrentProj,
All_queried_list = All_list -- AllHosed,
@ -403,7 +451,6 @@ read_latest_projection_call_only(ProjectionType, AllHosed,
Else -> Else
end
end,
%% io:format(user, "All_queried_list ~p\n", [All_queried_list]),
Rs = [perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end) ||
FLU <- All_queried_list],
%% Rs = [perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end) ||
@ -508,7 +555,9 @@ calc_projection(#ch_mgr{proj=LastProj, runenv=RunEnv} = S,
calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj,
RelativeToServer, AllHosed, Dbg,
#ch_mgr{name=MyName, runenv=RunEnv1}=S) ->
#ch_mgr{name=MyName,
runenv=RunEnv1,
repair_final_status=RepairFS}=S) ->
#projection_v1{epoch_number=OldEpochNum,
members_dict=MembersDict,
upi=OldUPI_list,
@ -549,6 +598,11 @@ calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj,
if Simulator_p andalso SameEpoch_p ->
D_foo=[{repair_airquote_done, {we_agree, (S#ch_mgr.proj)#projection_v1.epoch_number}}],
{NewUPI_list ++ [H], T, RunEnv2};
not Simulator_p
andalso
RepairFS == {repair_final_status, ok} ->
D_foo=[{repair_done, {repair_final_status, ok, (S#ch_mgr.proj)#projection_v1.epoch_number}}],
{NewUPI_list ++ Repairing_list2, [], RunEnv2};
true ->
D_foo=[],
{NewUPI_list, OldRepairing_list, RunEnv2}
@ -679,7 +733,7 @@ rank_projections(Projs, CurrentProj) ->
#projection_v1{all_members=All_list} = CurrentProj,
MemberRank = orddict:from_list(
lists:zip(All_list, lists:seq(1, length(All_list)))),
N = length(All_list),
N = ?MAX_CHAIN_LENGTH + 1,
[{rank_projection(Proj, MemberRank, N), Proj} || Proj <- Projs].
rank_projection(#projection_v1{upi=[]}, _MemberRank, _N) ->
@ -687,7 +741,22 @@ rank_projection(#projection_v1{upi=[]}, _MemberRank, _N) ->
rank_projection(#projection_v1{author_server=Author,
upi=UPI_list,
repairing=Repairing_list}, MemberRank, N) ->
AuthorRank = orddict:fetch(Author, MemberRank),
%% It's possible that there's "cross-talk" across projection
%% stores. For example, we were a chain of [a,b], then the
%% administrator sets a's members_dict to include only a.
%% However, b is still running and has written a public projection
%% suggestion to a, and a has seen it. (Or perhaps b has old
%% chain information from one/many configurations ago, and its
%% projection store was not wiped clean, then b was restarted &
%% begins using its local outdated projection information.)
%%
%% Server b is no longer a member of a's MemberRank scheme, so we
%% need to compensate for this by giving b an extremely low author
%% ranking.
AuthorRank = case orddict:find(Author, MemberRank) of
{ok, Rank} -> Rank;
error -> -(N*N*N*N)
end,
AuthorRank +
( N * length(Repairing_list)) +
(N*N * length(UPI_list)).
@ -705,9 +774,24 @@ do_set_chain_members_dict(MembersDict, #ch_mgr{proxies_dict=OldProxiesDict}=S)->
{ok, S#ch_mgr{members_dict=MembersDict,
proxies_dict=orddict:from_list(Proxies)}}.
do_react_to_env(#ch_mgr{proj=#projection_v1{epoch_number=Epoch,
members_dict=[]}}=S) ->
{{empty_members_dict, [], Epoch}, S};
do_react_to_env(#ch_mgr{name=MyName,
proj=#projection_v1{epoch_number=Epoch,
members_dict=[]=OldDict}=OldProj,
opts=Opts}=S) ->
%% Read from our local *public* projection store. If some other
%% chain member has written something there, and if we are a
%% member of that chain, then we'll adopt that projection and then
%% start actively humming in that chain.
{NewMembersDict, NewProj} =
get_my_public_proj_boot_info(Opts, OldDict, OldProj),
case orddict:is_key(MyName, NewMembersDict) of
false ->
{{empty_members_dict, [], Epoch}, S};
true ->
{_, S2} = do_set_chain_members_dict(NewMembersDict, S),
{{empty_members_dict, [], Epoch},
S2#ch_mgr{proj=NewProj, members_dict=NewMembersDict}}
end;
do_react_to_env(S) ->
put(react, []),
react_to_env_A10(S).
@ -716,11 +800,44 @@ react_to_env_A10(S) ->
?REACT(a10),
react_to_env_A20(0, S).
react_to_env_A20(Retries, S) ->
react_to_env_A20(Retries, #ch_mgr{name=MyName}=S) ->
?REACT(a20),
init_remember_partition_hack(),
{UnanimousTag, P_latest, ReadExtra, S2} =
do_cl_read_latest_public_projection(true, S),
LastComplaint = get(rogue_server_epoch),
case orddict:is_key(P_latest#projection_v1.author_server,
S#ch_mgr.members_dict) of
false when P_latest#projection_v1.epoch_number /= LastComplaint ->
put(rogue_server_epoch, P_latest#projection_v1.epoch_number),
Rogue = P_latest#projection_v1.author_server,
error_logger:info_msg("Chain manager ~w found latest public "
"projection ~w has author ~w not a member "
"of our members list ~w. Please check "
"chain membership on this "
"rogue chain manager ~w.\n",
[S#ch_mgr.name,
P_latest#projection_v1.epoch_number,
Rogue,
[K || {K,_} <- orddict:to_list(S#ch_mgr.members_dict)],
Rogue]);
_ ->
ok
end,
case lists:member(MyName, P_latest#projection_v1.all_members) of
false when P_latest#projection_v1.epoch_number /= LastComplaint,
P_latest#projection_v1.all_members /= [] ->
put(rogue_server_epoch, P_latest#projection_v1.epoch_number),
error_logger:info_msg("Chain manager ~p found latest public "
"projection ~p has author ~p has a "
"members list ~p that does not include me.\n",
[S#ch_mgr.name,
P_latest#projection_v1.epoch_number,
P_latest#projection_v1.author_server,
P_latest#projection_v1.all_members]);
_ ->
ok
end,
%% The UnanimousTag isn't quite sufficient for our needs. We need
%% to determine if *all* of the UPI+Repairing FLUs are members of
@ -1140,10 +1257,10 @@ react_to_env_B10(Retries, P_newprop, P_latest, LatestUnanimousP,
{X, S2} = gimme_random_uniform(100, S),
if X < 80 ->
?REACT({b10, ?LINE, [flap_stop]}),
ThrottleTime = if FlapLimit < 500 -> 1;
FlapLimit < 1000 -> 5;
FlapLimit < 5000 -> 10;
true -> 30
ThrottleTime = if P_newprop_flap_count < 500 -> 1;
P_newprop_flap_count < 1000 -> 5;
P_newprop_flap_count < 5000 -> 10;
true -> 30
end,
FinalProps = [{my_flap_limit, FlapLimit},
{throttle_seconds, ThrottleTime}],
@ -1429,10 +1546,8 @@ calculate_flaps(P_newprop, _P_current, _FlapLimit,
AllHosed = []
end,
FlappingI = {flapping_i, [{flap_count, {NewFlapStart, NewFlaps}},
{all_hosed, AllHosed},
{all_flap_counts, lists:sort(AllFlapCounts)},
{bad,BadFLUs}]},
FlappingI = make_flapping_i(NewFlapStart, NewFlaps, AllHosed,
AllFlapCounts, BadFLUs),
Dbg2 = [FlappingI|P_newprop#projection_v1.dbg],
%% TODO: 2015-03-04: I'm growing increasingly suspicious of
%% the 'runenv' variable that's threaded through all this code.
@ -1452,6 +1567,15 @@ calculate_flaps(P_newprop, _P_current, _FlapLimit,
{machi_projection:update_checksum(P_newprop#projection_v1{dbg=Dbg2}),
S#ch_mgr{flaps=NewFlaps, flap_start=NewFlapStart, runenv=RunEnv2}}.
make_flapping_i() ->
make_flapping_i({{epk,-1},?NOT_FLAPPING}, 0, [], [], []).
make_flapping_i(NewFlapStart, NewFlaps, AllHosed, AllFlapCounts, BadFLUs) ->
{flapping_i, [{flap_count, {NewFlapStart, NewFlaps}},
{all_hosed, AllHosed},
{all_flap_counts, lists:sort(AllFlapCounts)},
{bad,BadFLUs}]}.
projection_transitions_are_sane(Ps, RelativeToServer) ->
projection_transitions_are_sane(Ps, RelativeToServer, false).
@ -1717,7 +1841,8 @@ sleep_ranked_order(MinSleep, MaxSleep, FLU, FLU_list) ->
USec.
calc_sleep_ranked_order(MinSleep, MaxSleep, FLU, FLU_list) ->
Front = lists:takewhile(fun(X) -> X /= FLU end, lists:sort(FLU_list)),
Front = lists:takewhile(fun(X) -> X /= FLU end,
lists:reverse(lists:sort(FLU_list))),
Index = length(Front),
NumNodes = length(FLU_list),
SleepChunk = if NumNodes == 0 -> 0;
@ -1800,6 +1925,73 @@ make_chmgr_regname(B) when is_binary(B) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
perhaps_start_repair(
#ch_mgr{name=MyName,
repair_worker=undefined,
proj=#projection_v1{creation_time=Start,
upi=[_|_]=UPI,
repairing=[_|_]}}=S) ->
RepairId = {MyName, os:timestamp()},
RepairOpts = [{repair_mode, repair}, verbose, {repair_id, RepairId}],
%% RepairOpts = [{repair_mode, check}, verbose],
RepairFun = fun() -> do_repair(S, RepairOpts, ap_mode) end,
LastUPI = lists:last(UPI),
case timer:now_diff(os:timestamp(), Start) div 1000000 of
N when MyName == LastUPI,
N >= ?REPAIR_START_STABILITY_TIME ->
{WorkerPid, _Ref} = spawn_monitor(RepairFun),
S#ch_mgr{repair_worker=WorkerPid,
repair_start=os:timestamp(),
repair_final_status=undefined};
_ ->
S
end;
perhaps_start_repair(S) ->
S.
do_repair(
#ch_mgr{name=MyName,
proj=#projection_v1{upi=UPI,
repairing=[_|_]=Repairing,
members_dict=MembersDict}}=_S_copy,
Opts, ap_mode=RepairMode) ->
T1 = os:timestamp(),
RepairId = proplists:get_value(repair_id, Opts, id1),
error_logger:info_msg("Repair start: tail ~p of ~p -> ~p, ~p ID ~w\n",
[MyName, UPI, Repairing, RepairMode, RepairId]),
ETS = ets:new(repair_stats, [private, set]),
ETS_T_Keys = [t_in_files, t_in_chunks, t_in_bytes,
t_out_files, t_out_chunks, t_out_bytes,
t_bad_chunks, t_elapsed_seconds],
[ets:insert(ETS, {K, 0}) || K <- ETS_T_Keys],
Res = machi_chain_repair:repair(ap_mode, MyName, Repairing, UPI,
MembersDict, ETS, Opts),
T2 = os:timestamp(),
Elapsed = (timer:now_diff(T2, T1) div 1000) / 1000,
ets:insert(ETS, {t_elapsed_seconds, Elapsed}),
Summary = case Res of ok -> "success";
_ -> "FAILURE"
end,
Stats = [{K, ets:lookup_element(ETS, K, 2)} || K <- ETS_T_Keys],
error_logger:info_msg("Repair ~s: tail ~p of ~p finished ~p repair ID ~w: "
"~w\nStats ~w\n",
[Summary, MyName, UPI, RepairMode, RepairId,
Res, Stats]),
ets:delete(ETS),
exit({repair_final_status, Res}).
sanitize_repair_state(#ch_mgr{repair_final_status=Res,
proj=#projection_v1{upi=[_|_]}}=S)
when Res /= undefined ->
S#ch_mgr{repair_worker=undefined, repair_start=undefined,
repair_final_status=undefined};
sanitize_repair_state(S) ->
S.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
perhaps_call_t(S, Partitions, FLU, DoIt) ->
try
perhaps_call(S, Partitions, FLU, DoIt)

385
src/machi_chain_repair.erl Normal file
View file

@ -0,0 +1,385 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc Erlang API for the Machi FLU TCP protocol version 1, with a
%% proxy-process style API for hiding messy details such as TCP
%% connection/disconnection with the remote Machi server.
%%
%% Machi is intentionally avoiding using distributed Erlang for
%% Machi's communication. This design decision makes Erlang-side code
%% more difficult &amp; complex, but it's the price to pay for some
%% language independence. Later in Machi's life cycle, we need to
%% (re-)implement some components in a non-Erlang/BEAM-based language.
%%
%% This module implements a "man in the middle" proxy between the
%% Erlang client and Machi server (which is on the "far side" of a TCP
%% connection to somewhere). This proxy process will always execute
%% on the same Erlang node as the Erlang client that uses it. The
%% proxy is intended to be a stable, long-lived process that survives
%% TCP communication problems with the remote server.
-module(machi_chain_repair).
-include("machi_projection.hrl").
-define(SHORT_TIMEOUT, 5*1000).
-define(LONG_TIMEOUT, 60*1000).
-define(MAX_OFFSET, 999*1024*1024*1024*1024*1024*1024*1024).
%% These macros assume there's a bound variable called Verb.
-define(VERB(Fmt), if Verb -> io:format(Fmt ); true -> ok end).
-define(VERB(Fmt, Args), if Verb -> io:format(Fmt, Args); true -> ok end).
-ifdef(TEST).
-compile(export_all).
-endif. % TEST
-export([repair/7]).
repair_cp(_Src, _Dst, _MembersDict, _Opts) ->
%% TODO: add missing function: wipe away any trace of chunks
%% are present on Dst but missing on Src.
exit(todo_cp_mode).
repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) ->
%% Use process dict so that 'after' clause can always quit all
%% proxy pids.
put(proxies_dict, orddict:new()),
Add = fun(Name, Pid) -> put(proxies_dict, orddict:store(Name, Pid, get(proxies_dict))) end,
OurFLUs = lists:usort([Src] ++ Repairing ++ UPI), % AP assumption!
RepairMode = proplists:get_value(repair_mode, Opts, repair),
Verb = proplists:get_value(verbose, Opts, true),
Res = try
[begin
{ok, Proxy} = machi_proxy_flu1_client:start_link(P),
Add(FLU, Proxy)
end || {FLU,P} <- MembersDict, lists:member(FLU, OurFLUs)],
ProxiesDict = get(proxies_dict),
D = dict:new(),
D2 = lists:foldl(fun({FLU, Proxy}, Dict) ->
get_file_lists(Proxy, FLU, Dict)
end, D, ProxiesDict),
MissingFileSummary = make_missing_file_summary(D2, OurFLUs),
?VERB("MissingFileSummary ~p\n", [MissingFileSummary]),
[ets:insert(ETS, {{directive_bytes, FLU}, 0}) || FLU <- OurFLUs],
%% Repair files from perspective of Src, i.e. tail(UPI).
SrcProxy = orddict:fetch(Src, ProxiesDict),
{ok, EpochID} = machi_proxy_flu1_client:get_epoch_id(
SrcProxy, ?SHORT_TIMEOUT),
?VERB("Make repair directives: "),
Ds =
[{File, make_repair_directives(
ConsistencyMode, RepairMode, File, Size, EpochID,
Verb,
Src, OurFLUs, ProxiesDict, ETS)} ||
{File, {Size, _MissingList}} <- MissingFileSummary],
?VERB(" done\n"),
[begin
[{_, Bytes}] = ets:lookup(ETS, {directive_bytes, FLU}),
?VERB("Out-of-sync data for FLU ~p: ~s MBytes\n",
[FLU, mbytes(Bytes)])
end || FLU <- OurFLUs],
?VERB("Execute repair directives: "),
ok = execute_repair_directives(ConsistencyMode, Ds, Src, EpochID,
Verb, OurFLUs, ProxiesDict, ETS),
?VERB(" done\n"),
ok
catch
What:Why ->
Stack = erlang:get_stacktrace(),
io:format(user, "What Why ~p ~p @\n\t~p\n",
[What, Why, Stack]),
{error, {What, Why, Stack}}
after
[(catch machi_proxy_flu1_client:quit(Pid)) ||
Pid <- orddict:to_list(get(proxies_dict))]
end,
Res.
%% Create a list of servers where the file is completely missing.
%% In the "demo day" implementation and in an early integration WIP,
%% this was a useful thing. TODO: Can this be removed?
make_missing_file_summary(Dict, AllFLUs) ->
%% FileFilterFun = fun(_) -> true end,
FoldRes = lists:sort(dict:to_list(Dict)),
%% NOTE: MissingFileSummary = [{File, {FileSize, ServersMissingFrom}}]
MissingFileSummary =
[begin
{GotIt, Sizes} = lists:unzip(GotSizes),
Size = lists:max(Sizes),
Missing = {File, {Size, AllFLUs -- GotIt}},
Missing
end || {File, GotSizes} <- FoldRes %% , FileFilterFun(File)
],
MissingFileSummary.
get_file_lists(Proxy, FLU_name, D) ->
{ok, Res} = machi_proxy_flu1_client:list_files(Proxy, ?DUMMY_PV1_EPOCH,
?SHORT_TIMEOUT),
lists:foldl(fun({Size, File}, Dict) ->
dict:append(File, {FLU_name, Size}, Dict)
end, D, Res).
%% Wow, it's so easy to bikeshed this into a 1 year programming exercise.
%%
%% TODO: There are a lot of areas for exploiting parallelism here.
%% I've set the bikeshed aside for now, but "make repair faster" has a
%% lot of room for exploiting concurrency, overlapping reads & writes,
%% etc etc. There are also lots of different trade-offs to make with
%% regard to RAM use vs. disk use.
%%
%% TODO: There's no reason why repair can't be done 1).in parallel
%% across multiple repairees, and/or 2). with multiple byte ranges in
%% the same file, and/or 3). with bigger chunks.
%%
%% 1. Optimization
%% 2. Optimization
%% 3. Optimization, but it would be the easiest to implement, e.g. use
%% constant-sized 4MB chunks. Unfortuntely, it would also destroy
%% the ability to verify here that the chunk checksums are correct
%% *and* also propagate the correct checksum metadata to the
%% destination FLU.
%% As an additional optimization, add a bit of #2 to start the next
%% read while the current write is still in progress.
%%
%% Most/all of this could be executed in parallel on each FLU relative to
%% its own files. Then, in another TODO option, perhaps build a Merkle tree
%% or other summary of the local files & send that data structure to the
%% repair coordinator.
%%
%% Also, as another TODO note, repair_both_present() in the
%% prototype/demo-day code uses an optimization of calculating the MD5
%% checksum of the chunk checksum data as it arrives, and if the two MD5s
%% match, then we consider the two files in sync. If there isn't a match,
%% then we sort the lines and try another MD5, and if they match, then we're
%% in sync. In theory, that's lower overhead than the procedure used here.
%%
%% NOTE that one reason I chose the "directives list" method is to have an
%% option, later, of choosing to repair a subset of repairee FLUs if there
%% is a big discrepency between out of sync files: e.g., if FLU x has N
%% bytes out of sync but FLU y has 50N bytes out of sync, then it's likely
%% better to repair x only so that x can return to the UPI list quickly.
%% Also, in the event that all repairees are roughly comparably out of sync,
%% then the repair network traffic can be minimized by reading each chunk
%% only once.
make_repair_compare_fun(SrcFLU) ->
fun({{Offset_X, _Sz_a, _Cs_a, FLU_a}, _N_a},
{{Offset_X, _Sz_b, _CS_b, FLU_b}, _N_b}) ->
%% The repair source FLU always sorts less/earlier than anything else.
if FLU_a == SrcFLU ->
true;
FLU_b == SrcFLU ->
false;
true ->
%% Implicitly, smallest offset first.
%% Secondarily (and implicitly), sort smallest chunk size first
FLU_a < FLU_b
end;
(T_a, T_b) ->
%% See implicitly comments above
T_a =< T_b
end.
make_repair_directives(ConsistencyMode, RepairMode, File, Size, EpochID,
Verb, Src, FLUs0, ProxiesDict, ETS) ->
true = (Size < ?MAX_OFFSET),
FLUs = lists:usort(FLUs0),
C0 = [begin
%% erlang:garbage_collect(),
Proxy = orddict:fetch(FLU, ProxiesDict),
OffSzCs = case machi_proxy_flu1_client:checksum_list(
Proxy, EpochID, File, ?LONG_TIMEOUT) of
{ok, X} -> X;
{error, no_such_file} -> []
end,
[{?MAX_OFFSET, 0, <<>>, FLU}] % our end-of-file marker
++
[{Off, Sz, Cs, FLU} || {Off, Sz, Cs} <- OffSzCs]
end || FLU <- FLUs],
C1 = lists:append(C0),
%% erlang:garbage_collect(),
C2 = lists:sort(make_repair_compare_fun(Src), C1),
%% erlang:garbage_collect(),
Ds = make_repair_directives2(C2, ConsistencyMode, RepairMode,
File, Verb, Src, FLUs, ProxiesDict, ETS),
Ds.
make_repair_directives2(C2, ConsistencyMode, RepairMode,
File, Verb, Src, FLUs, ProxiesDict, ETS) ->
?VERB("."),
make_repair_directives3(C2, ConsistencyMode, RepairMode,
File, Verb, Src, FLUs, ProxiesDict, ETS, []).
make_repair_directives3([{?MAX_OFFSET, 0, <<>>, _FLU}|_Rest],
_ConsistencyMode, _RepairMode,
_File, _Verb, _Src, _FLUs, _ProxiesDict, _ETS, Acc) ->
lists:reverse(Acc);
make_repair_directives3([{Offset, Size, CSum, _FLU}=A|Rest0],
ConsistencyMode, RepairMode,
File, Verb, Src, FLUs, ProxiesDict, ETS, Acc) ->
{As0, Rest1} = take_same_offset_size(Rest0, Offset, Size),
As = [A|As0],
%% Sanity checking time
case lists:all(fun({_, _, Cs, _}) when Cs == CSum -> true;
(_) -> false
end, As) of
true ->
ok;
false ->
%% TODO: Pathology: someone has the wrong checksum.
%% 1. Fetch Src's chunk. If checksum is valid, use this chunk
%% to repair any invalid value.
%% 2. If Src's chunk is invalid, then check for other copies
%% in the UPI. If there is a valid chunk there, use it to
%% repair any invalid value.
%% 3a. If there is no valid UPI chunk, then delete this
%% byte range from all FLUs
%% 3b. Log big warning about data loss.
%% 4. Log any other checksum discrepencies as they are found.
exit({todo_repair_sanity_check, ?LINE, File, Offset, As})
end,
%% List construction guarantees us that there's at least one ?MAX_OFFSET
%% item remains. Sort order + our "taking" of all exact Offset+Size
%% tuples guarantees that if there's a disagreement about chunk size at
%% this offset, we can look ahead exactly one to see if there is sanity
%% or not.
[{Offset_next, _Size_next, _, _}=A_next|_] = Rest1,
if Offset + Size =< Offset_next ->
ok;
true ->
exit({todo_repair_sanity_check, ?LINE, File, Offset, Size,
next_is, A_next})
end,
Do = if ConsistencyMode == ap_mode ->
Gots = [FLU || {_Off, _Sz, _Cs, FLU} <- As],
Missing = FLUs -- Gots,
_ThisSrc = case lists:member(Src, Gots) of
true -> Src;
false -> hd(Gots)
end,
[ets:update_counter(ETS, {directive_bytes, FLU_m}, Size) ||
FLU_m <- Missing],
if Missing == [] ->
noop;
true ->
{copy, A, Missing}
end;
ConsistencyMode == cp_mode ->
exit({todo_cp_mode, ?MODULE, ?LINE})
end,
Acc2 = if Do == noop -> Acc;
true -> [Do|Acc]
end,
make_repair_directives3(Rest1,
ConsistencyMode, RepairMode,
File, Verb, Src, FLUs, ProxiesDict, ETS, Acc2).
take_same_offset_size(L, Offset, Size) ->
take_same_offset_size(L, Offset, Size, []).
take_same_offset_size([{Offset, Size, _CSum, _FLU}=A|Rest], Offset, Size, Acc) ->
take_same_offset_size(Rest, Offset, Size, [A|Acc]);
take_same_offset_size(Rest, _Offset, _Size, Acc) ->
{Acc, Rest}.
execute_repair_directives(ap_mode=_ConsistencyMode, Ds, _Src, EpochID, Verb,
_OurFLUs, ProxiesDict, ETS) ->
{_,_,_,_} = lists:foldl(fun execute_repair_directive/2,
{ProxiesDict, EpochID, Verb, ETS}, Ds),
ok.
execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) ->
EtsKeys = [{in_files, t_in_files}, {in_chunks, t_in_chunks},
{in_bytes, t_in_bytes}, {out_files, t_out_files},
{out_chunks, t_out_chunks}, {out_bytes, t_out_bytes}],
[ets:insert(ETS, {L_K, 0}) || {L_K, _T_K} <- EtsKeys],
F = fun({copy, {Offset, Size, CSum, MySrc}, MyDsts}, Acc2) ->
SrcP = orddict:fetch(MySrc, ProxiesDict),
case ets:lookup_element(ETS, in_chunks, 2) rem 100 of
0 -> ?VERB(".", []);
_ -> ok
end,
_T1 = os:timestamp(),
{ok, Chunk} = machi_proxy_flu1_client:read_chunk(
SrcP, EpochID, File, Offset, Size,
?SHORT_TIMEOUT),
_T2 = os:timestamp(),
case machi_util:checksum_chunk(Chunk) of
CSum_now when CSum_now == CSum ->
[begin
DstP = orddict:fetch(DstFLU, ProxiesDict),
_T3 = os:timestamp(),
ok = machi_proxy_flu1_client:write_chunk(
DstP, EpochID, File, Offset, Chunk,
?SHORT_TIMEOUT),
_T4 = os:timestamp()
end || DstFLU <- MyDsts],
ets:update_counter(ETS, in_chunks, 1),
ets:update_counter(ETS, in_bytes, Size),
N = length(MyDsts),
ets:update_counter(ETS, out_chunks, N),
ets:update_counter(ETS, out_bytes, N*Size),
Acc2;
CSum_now ->
error_logger:error_msg(
"TODO: Checksum failure: "
"file ~p offset ~p size ~p: "
"expected ~p got ~p\n",
[File, Offset, Size, CSum, CSum_now]),
case ets:update_counter(ETS, t_bad_chunks, 1) of
N when N > 100 ->
throw(todo_wow_so_many_errors_so_verbose);
_ ->
ok
end,
Acc2
end
end,
ok = lists:foldl(F, ok, Cmds),
%% Copy this file's stats to the total counts.
[ets:update_counter(ETS, T_K, ets:lookup_element(ETS, L_K, 2)) ||
{L_K, T_K} <- EtsKeys],
Acc.
mbytes(0) ->
"0.0";
mbytes(Size) ->
lists:flatten(io_lib:format("~.1.0f", [max(0.1, Size / (1024*1024))])).
-ifdef(TEST).
repair_compare_fun_test() ->
F = make_repair_compare_fun(b),
List = [{{1,10,x,b},y},{{50,10,x,a},y},{{50,10,x,b},y},{{50,10,x,c},y},{{90,10,x,d},y}],
Input = lists:reverse(lists:sort(List)),
%% Although the merge func should never have two of the same FLU
%% represented, it doesn't matter for the purposes of this test.
%% 1. Smaller offset (element #1) wins, else...
%% 2. The FLU (element #2) that's the repair source always wins, else...
%% 3. The FLU with smallest name wins.
Expect = [{{1,10,x,b},y},{{50,10,x,b},y},{{50,10,x,a},y},{{50,10,x,c},y},{{90,10,x,d},y}],
Expect = lists:sort(F, Input).
-endif. % TEST

View file

@ -114,30 +114,50 @@ main2(FluName, TcpPort, DataDir, Rest) ->
DPs ->
{lists:keydelete(dbg, 1, Rest), DPs}
end,
{SendAppendPidToProj_p, ProjectionPid} =
case proplists:get_value(projection_store_registered_name, Rest) of
undefined ->
RN = make_projection_server_regname(FluName),
{ok, PP} =
machi_projection_store:start_link(RN, DataDir, undefined),
{true, PP};
RN ->
{false, whereis(RN)}
end,
InitialWedged_p = proplists:get_value(initial_wedged, DbgProps),
ProjRes = machi_projection_store:read_latest_projection(ProjectionPid,
private),
{Wedged_p, EpochId} =
if InitialWedged_p == undefined,
is_tuple(ProjRes), element(1, ProjRes) == ok ->
{ok, Proj} = ProjRes,
{false, {Proj#projection_v1.epoch_number,
Proj#projection_v1.epoch_csum}};
InitialWedged_p == false ->
{false, ?DUMMY_PV1_EPOCH};
true ->
{true, undefined}
end,
S0 = #state{flu_name=FluName,
proj_store=ProjectionPid,
tcp_port=TcpPort,
data_dir=DataDir,
wedged=proplists:get_value(initial_wedged, DbgProps, true),
wedged=Wedged_p,
etstab=ets_table_name(FluName),
epoch_id=undefined,
epoch_id=EpochId,
dbg_props=DbgProps,
props=Props},
AppendPid = start_append_server(S0, self()),
receive
append_server_ack -> ok
end,
{_ProjRegName, ProjectionPid} =
case proplists:get_value(projection_store_registered_name, Rest) of
undefined ->
RN = make_projection_server_regname(FluName),
{ok, PP} =
machi_projection_store:start_link(RN, DataDir, AppendPid),
{RN, PP};
RN ->
{RN, whereis(RN)}
end,
S1 = S0#state{append_pid=AppendPid,
proj_store=ProjectionPid},
if SendAppendPidToProj_p ->
machi_projection_store:set_wedge_notify_pid(ProjectionPid,
AppendPid);
true ->
ok
end,
S1 = S0#state{append_pid=AppendPid},
ListenPid = start_listen_server(S1),
Config_e = machi_util:make_config_filename(DataDir, "unused"),
@ -174,13 +194,15 @@ run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) ->
{ok, LSock} = gen_tcp:listen(TcpPort, SockOpts),
listen_server_loop(LSock, S).
run_append_server(FluPid, AckPid, #state{flu_name=Name,dbg_props=DbgProps}=S) ->
run_append_server(FluPid, AckPid, #state{flu_name=Name,
wedged=Wedged_p,epoch_id=EpochId}=S) ->
%% Reminder: Name is the "main" name of the FLU, i.e., no suffix
register(Name, self()),
TID = ets:new(ets_table_name(Name),
[set, protected, named_table, {read_concurrency, true}]),
InitialWedged = proplists:get_value(initial_wedged, DbgProps, true),
ets:insert(TID, {epoch, {InitialWedged, {-65, <<"bogus epoch, yo">>}}}),
%% InitialWedged = proplists:get_value(initial_wedged, DbgProps, true),
%% ets:insert(TID, {epoch, {InitialWedged, {-65, <<"bogus epoch, yo">>}}}),
ets:insert(TID, {epoch, {Wedged_p, EpochId}}),
AckPid ! append_server_ack,
append_server_loop(FluPid, S#state{etstab=TID}).
@ -192,13 +214,13 @@ listen_server_loop(LSock, S) ->
append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) ->
AppendServerPid = self(),
receive
{seq_append, From, _Prefix, _Chunk, _CSum} when Wedged_p ->
{seq_append, From, _Prefix, _Chunk, _CSum, _Extra} when Wedged_p ->
From ! wedged,
append_server_loop(FluPid, S);
{seq_append, From, Prefix, Chunk, CSum} ->
spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum,
DataDir, AppendServerPid) end),
%% DataDir, FluPid) end),
{seq_append, From, Prefix, Chunk, CSum, Extra} ->
spawn(fun() -> append_server_dispatch(From, Prefix,
Chunk, CSum, Extra,
DataDir, AppendServerPid) end),
append_server_loop(FluPid, S);
{wedge_state_change, Boolean, EpochId} ->
true = ets:insert(S#state.etstab, {epoch, {Boolean, EpochId}}),
@ -213,14 +235,19 @@ append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) ->
append_server_loop(FluPid, S)
end.
-define(EpochIDSpace, (4+20)).
-define(EpochIDSpace, ((4*2)+(20*2))). % hexencodingwhee!
decode_epoch_id(EpochIDHex) ->
<<EpochNum:(4*8)/big, EpochCSum/binary>> =
machi_util:hexstr_to_bin(EpochIDHex),
{EpochNum, EpochCSum}.
net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0, 600*1000) of
{ok, Line} ->
%% machi_util:verb("Got: ~p\n", [Line]),
PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 - 1,
PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 -8 - 1,
FileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 16 - 8 - 1,
CSumFileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 1,
WriteFileLenLF = byte_size(Line) - 7 - ?EpochIDSpace - 16 - 8 - 1,
@ -228,21 +255,26 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
case Line of
%% For normal use
<<"A ",
_EpochIDRaw:(?EpochIDSpace)/binary,
LenHex:8/binary,
EpochIDHex:(?EpochIDSpace)/binary,
LenHex:8/binary, ExtraHex:8/binary,
Prefix:PrefixLenLF/binary, "\n">> ->
do_net_server_append(FluName, Sock, LenHex, Prefix);
_EpochID = decode_epoch_id(EpochIDHex),
do_net_server_append(FluName, Sock, LenHex, ExtraHex,
Prefix);
<<"R ",
EpochIDRaw:(?EpochIDSpace)/binary,
EpochIDHex:(?EpochIDSpace)/binary,
OffsetHex:16/binary, LenHex:8/binary,
File:FileLenLF/binary, "\n">> ->
EpochID = decode_epoch_id(EpochIDHex),
do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir,
EpochIDRaw, S);
<<"L ", _EpochIDRaw:(?EpochIDSpace)/binary, "\n">> ->
EpochID, S);
<<"L ", EpochIDHex:(?EpochIDSpace)/binary, "\n">> ->
_EpochID = decode_epoch_id(EpochIDHex),
do_net_server_listing(Sock, DataDir, S);
<<"C ",
_EpochIDRaw:(?EpochIDSpace)/binary,
EpochIDHex:(?EpochIDSpace)/binary,
File:CSumFileLenLF/binary, "\n">> ->
_EpochID = decode_epoch_id(EpochIDHex),
do_net_server_checksum_listing(Sock, File, DataDir, S);
<<"QUIT\n">> ->
catch gen_tcp:close(Sock),
@ -252,20 +284,23 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
exit(normal);
%% For "internal" replication only.
<<"W-repl ",
_EpochIDRaw:(?EpochIDSpace)/binary,
EpochIDHex:(?EpochIDSpace)/binary,
OffsetHex:16/binary, LenHex:8/binary,
File:WriteFileLenLF/binary, "\n">> ->
_EpochID = decode_epoch_id(EpochIDHex),
do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir,
<<"fixme1">>, false, <<"fixme2">>);
%% For data migration only.
<<"DEL-migration ",
_EpochIDRaw:(?EpochIDSpace)/binary,
EpochIDHex:(?EpochIDSpace)/binary,
File:DelFileLenLF/binary, "\n">> ->
_EpochID = decode_epoch_id(EpochIDHex),
do_net_server_delete_migration_only(Sock, File, DataDir);
%% For erasure coding hackityhack
<<"TRUNC-hack--- ",
_EpochIDRaw:(?EpochIDSpace)/binary,
EpochIDHex:(?EpochIDSpace)/binary,
File:DelFileLenLF/binary, "\n">> ->
_EpochID = decode_epoch_id(EpochIDHex),
do_net_server_truncate_hackityhack(Sock, File, DataDir);
<<"PROJ ", LenHex:8/binary, "\n">> ->
do_projection_command(Sock, LenHex, S);
@ -273,6 +308,7 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
do_wedge_status(FluName, Sock);
_ ->
machi_util:verb("Else Got: ~p\n", [Line]),
io:format(user, "TODO: Else Got: ~p\n", [Line]),
gen_tcp:send(Sock, "ERROR SYNTAX\n"),
catch gen_tcp:close(Sock),
exit(normal)
@ -283,16 +319,16 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
exit(normal)
end.
append_server_dispatch(From, Prefix, Chunk, CSum, DataDir, LinkPid) ->
append_server_dispatch(From, Prefix, Chunk, CSum, Extra, DataDir, LinkPid) ->
Pid = write_server_get_pid(Prefix, DataDir, LinkPid),
Pid ! {seq_append, From, Prefix, Chunk, CSum},
Pid ! {seq_append, From, Prefix, Chunk, CSum, Extra},
exit(normal).
do_net_server_append(FluName, Sock, LenHex, Prefix) ->
do_net_server_append(FluName, Sock, LenHex, ExtraHex, Prefix) ->
%% TODO: robustify against other invalid path characters such as NUL
case sanitize_file_string(Prefix) of
ok ->
do_net_server_append2(FluName, Sock, LenHex, Prefix);
do_net_server_append2(FluName, Sock, LenHex, ExtraHex, Prefix);
_ ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG">>)
end.
@ -305,13 +341,14 @@ sanitize_file_string(Str) ->
error
end.
do_net_server_append2(FluName, Sock, LenHex, Prefix) ->
do_net_server_append2(FluName, Sock, LenHex, ExtraHex, Prefix) ->
<<Len:32/big>> = machi_util:hexstr_to_bin(LenHex),
<<Extra:32/big>> = machi_util:hexstr_to_bin(ExtraHex),
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Chunk} = gen_tcp:recv(Sock, Len, 60*1000),
CSum = machi_util:checksum_chunk(Chunk),
try
FluName ! {seq_append, self(), Prefix, Chunk, CSum}
FluName ! {seq_append, self(), Prefix, Chunk, CSum, Extra}
catch error:badarg ->
error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE])
end,
@ -348,7 +385,7 @@ do_wedge_status(FluName, Sock) ->
ok = gen_tcp:send(Sock, Reply).
do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir,
EpochIDRaw, S) ->
EpochID, S) ->
{Wedged_p, CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2),
DoItFun = fun(FH, Offset, Len) ->
case file:pread(FH, Offset, Len) of
@ -368,16 +405,16 @@ do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir,
end,
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
[read, binary, raw], DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId).
EpochID, Wedged_p, CurrentEpochId).
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId) ->
EpochID, Wedged_p, CurrentEpochId) ->
case {Wedged_p, sanitize_file_string(FileBin)} of
{false, ok} ->
do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin,
DataDir, FileOpts, DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId);
EpochID, Wedged_p, CurrentEpochId);
{true, _} ->
ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>);
{_, __} ->
@ -386,7 +423,7 @@ do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId) ->
EpochID, Wedged_p, CurrentEpochId) ->
<<Offset:64/big>> = machi_util:hexstr_to_bin(OffsetHex),
<<Len:32/big>> = machi_util:hexstr_to_bin(LenHex),
{_, Path} = machi_util:make_data_filename(DataDir, FileBin),
@ -402,7 +439,7 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
do_net_server_readwrite_common(
Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId);
EpochID, Wedged_p, CurrentEpochId);
_Else ->
%%%%%% keep?? machi_util:verb("Else ~p ~p ~p ~p\n", [Offset, Len, Path, _Else]),
ok = gen_tcp:send(Sock, <<"ERROR BAD-IO\n">>)
@ -410,20 +447,20 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir,
EpochIDRaw, Wedged_p, CurrentEpochId) ->
EpochID, Wedged_p, CurrentEpochId) ->
CSumPath = machi_util:make_checksum_filename(DataDir, FileBin),
case file:open(CSumPath, [append, raw, binary, delayed_write]) of
{ok, FHc} ->
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc,
EpochIDRaw, Wedged_p, CurrentEpochId);
EpochID, Wedged_p, CurrentEpochId);
{error, enoent} ->
ok = filelib:ensure_dir(CSumPath),
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir,
EpochIDRaw, Wedged_p, CurrentEpochId)
EpochID, Wedged_p, CurrentEpochId)
end.
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc,
EpochIDRaw, Wedged_p, CurrentEpochId) ->
EpochID, Wedged_p, CurrentEpochId) ->
DoItFun = fun(FHd, Offset, Len) ->
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Chunk} = gen_tcp:recv(Sock, Len),
@ -443,7 +480,7 @@ do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc,
end,
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
[write, read, binary, raw], DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId).
EpochID, Wedged_p, CurrentEpochId).
perhaps_do_net_server_ec_read(Sock, FH) ->
case file:pread(FH, 0, ?MINIMUM_OFFSET) of
@ -673,8 +710,12 @@ seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, FileNum, Offset)
run_seq_append_server2(Prefix, DataDir);
seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) ->
receive
{seq_append, From, Prefix, Chunk, CSum} ->
ok = file:pwrite(FHd, Offset, Chunk),
{seq_append, From, Prefix, Chunk, CSum, Extra} ->
if Chunk /= <<>> ->
ok = file:pwrite(FHd, Offset, Chunk);
true ->
ok
end,
From ! {assignment, Offset, File},
Len = byte_size(Chunk),
OffsetHex = machi_util:bin_to_hexstr(<<Offset:64/big>>),
@ -683,7 +724,7 @@ seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) ->
CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10],
ok = file:write(FHc, CSum_info),
seq_append_server_loop(DataDir, Prefix, File, FH_,
FileNum, Offset + Len);
FileNum, Offset + Len + Extra);
{sync_stuff, FromPid, Ref} ->
file:sync(FHc),
FromPid ! {sync_finished, Ref},

View file

@ -28,6 +28,7 @@
-export([
%% File API
append_chunk/4, append_chunk/5,
append_chunk_extra/5, append_chunk_extra/6,
read_chunk/5, read_chunk/6,
checksum_list/3, checksum_list/4,
list_files/2, list_files/3,
@ -42,7 +43,10 @@
list_all_projections/2, list_all_projections/3,
%% Common API
quit/1
quit/1,
%% Connection management API
connected_p/1, connect/1, disconnect/1
]).
%% For "internal" replication only.
-export([
@ -68,6 +72,7 @@
-type file_prefix() :: binary() | list().
-type inet_host() :: inet:ip_address() | inet:hostname().
-type inet_port() :: inet:port_number().
-type port_wrap() :: {w,atom(),term()}.
-type projection() :: #projection_v1{}.
-type projection_type() :: 'public' | 'private'.
@ -76,10 +81,10 @@
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
-spec append_chunk(port(), epoch_id(), file_prefix(), chunk()) ->
-spec append_chunk(port_wrap(), epoch_id(), file_prefix(), chunk()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
append_chunk(Sock, EpochID, Prefix, Chunk) ->
append_chunk2(Sock, EpochID, Prefix, Chunk).
append_chunk2(Sock, EpochID, Prefix, Chunk, 0).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
@ -88,16 +93,50 @@ append_chunk(Sock, EpochID, Prefix, Chunk) ->
epoch_id(), file_prefix(), chunk()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
append_chunk2(Sock, EpochID, Prefix, Chunk)
append_chunk2(Sock, EpochID, Prefix, Chunk, 0)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix' and also request an additional `Extra' bytes.
%%
%% For example, if the `Chunk' size is 1 KByte and `Extra' is 4K Bytes, then
%% the file offsets that follow `Chunk''s position for the following 4K will
%% be reserved by the file sequencer for later write(s) by the
%% `write_chunk()' API.
-spec append_chunk_extra(port_wrap(), epoch_id(), file_prefix(), chunk(), chunk_size()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra)
when is_integer(ChunkExtra), ChunkExtra >= 0 ->
append_chunk2(Sock, EpochID, Prefix, Chunk, ChunkExtra).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix' and also request an additional `Extra' bytes.
%%
%% For example, if the `Chunk' size is 1 KByte and `Extra' is 4K Bytes, then
%% the file offsets that follow `Chunk''s position for the following 4K will
%% be reserved by the file sequencer for later write(s) by the
%% `write_chunk()' API.
-spec append_chunk_extra(inet_host(), inet_port(),
epoch_id(), file_prefix(), chunk(), chunk_size()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
append_chunk_extra(Host, TcpPort, EpochID, Prefix, Chunk, ChunkExtra)
when is_integer(ChunkExtra), ChunkExtra >= 0 ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
append_chunk2(Sock, EpochID, Prefix, Chunk, ChunkExtra)
after
disconnect(Sock)
end.
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
-spec read_chunk(port(), epoch_id(), file_name(), file_offset(), chunk_size()) ->
-spec read_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk_size()) ->
{ok, chunk_s()} |
{error, error_general() | 'no_such_file' | 'partial_read'} |
{error, term()}.
@ -114,20 +153,20 @@ read_chunk(Sock, EpochID, File, Offset, Size)
{error, term()}.
read_chunk(Host, TcpPort, EpochID, File, Offset, Size)
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
read_chunk2(Sock, EpochID, File, Offset, Size)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Fetch the list of chunk checksums for `File'.
-spec checksum_list(port(), epoch_id(), file_name()) ->
-spec checksum_list(port_wrap(), epoch_id(), file_name()) ->
{ok, [chunk_csum()]} |
{error, error_general() | 'no_such_file' | 'partial_read'} |
{error, term()}.
checksum_list(Sock, EpochID, File) when is_port(Sock) ->
checksum_list(Sock, EpochID, File) ->
checksum_list2(Sock, EpochID, File).
%% @doc Fetch the list of chunk checksums for `File'.
@ -136,18 +175,18 @@ checksum_list(Sock, EpochID, File) when is_port(Sock) ->
{ok, [chunk_csum()]} |
{error, error_general() | 'no_such_file'} | {error, term()}.
checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
checksum_list2(Sock, EpochID, File)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Fetch the list of all files on the remote FLU.
-spec list_files(port(), epoch_id()) ->
-spec list_files(port_wrap(), epoch_id()) ->
{ok, [file_info()]} | {error, term()}.
list_files(Sock, EpochID) when is_port(Sock) ->
list_files(Sock, EpochID) ->
list2(Sock, EpochID).
%% @doc Fetch the list of all files on the remote FLU.
@ -155,19 +194,19 @@ list_files(Sock, EpochID) when is_port(Sock) ->
-spec list_files(inet_host(), inet_port(), epoch_id()) ->
{ok, [file_info()]} | {error, term()}.
list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
list2(Sock, EpochID)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Fetch the wedge status from the remote FLU.
-spec wedge_status(port()) ->
-spec wedge_status(port_wrap()) ->
{ok, {boolean(), pv1_epoch()}} | {error, term()}.
wedge_status(Sock) when is_port(Sock) ->
wedge_status(Sock) ->
wedge_status2(Sock).
%% @doc Fetch the wedge status from the remote FLU.
@ -175,16 +214,16 @@ wedge_status(Sock) when is_port(Sock) ->
-spec wedge_status(inet_host(), inet_port()) ->
{ok, {boolean(), pv1_epoch()}} | {error, term()}.
wedge_status(Host, TcpPort) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
wedge_status2(Sock)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
-spec get_latest_epoch(port(), projection_type()) ->
-spec get_latest_epoch(port_wrap(), projection_type()) ->
{ok, epoch_id()} | {error, term()}.
get_latest_epoch(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
@ -197,16 +236,16 @@ get_latest_epoch(Sock, ProjType)
{ok, epoch_id()} | {error, term()}.
get_latest_epoch(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
get_latest_epoch2(Sock, ProjType)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Get the latest projection from the FLU's projection store for `ProjType'
-spec read_latest_projection(port(), projection_type()) ->
-spec read_latest_projection(port_wrap(), projection_type()) ->
{ok, projection()} | {error, not_written} | {error, term()}.
read_latest_projection(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
@ -219,17 +258,17 @@ read_latest_projection(Sock, ProjType)
{ok, projection()} | {error, not_written} | {error, term()}.
read_latest_projection(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
read_latest_projection2(Sock, ProjType)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Read a projection `Proj' of type `ProjType'.
-spec read_projection(port(), projection_type(), epoch_num()) ->
{ok, projection()} | {error, written} | {error, term()}.
-spec read_projection(port_wrap(), projection_type(), epoch_num()) ->
{ok, projection()} | {error, not_written} | {error, term()}.
read_projection(Sock, ProjType, Epoch)
when ProjType == 'public' orelse ProjType == 'private' ->
read_projection2(Sock, ProjType, Epoch).
@ -238,19 +277,19 @@ read_projection(Sock, ProjType, Epoch)
-spec read_projection(inet_host(), inet_port(),
projection_type(), epoch_num()) ->
{ok, projection()} | {error, written} | {error, term()}.
{ok, projection()} | {error, not_written} | {error, term()}.
read_projection(Host, TcpPort, ProjType, Epoch)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
read_projection2(Sock, ProjType, Epoch)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Write a projection `Proj' of type `ProjType'.
-spec write_projection(port(), projection_type(), projection()) ->
-spec write_projection(port_wrap(), projection_type(), projection()) ->
'ok' | {error, written} | {error, term()}.
write_projection(Sock, ProjType, Proj)
when ProjType == 'public' orelse ProjType == 'private',
@ -265,16 +304,16 @@ write_projection(Sock, ProjType, Proj)
write_projection(Host, TcpPort, ProjType, Proj)
when ProjType == 'public' orelse ProjType == 'private',
is_record(Proj, projection_v1) ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
write_projection2(Sock, ProjType, Proj)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Get all projections from the FLU's projection store.
-spec get_all_projections(port(), projection_type()) ->
-spec get_all_projections(port_wrap(), projection_type()) ->
{ok, [projection()]} | {error, term()}.
get_all_projections(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
@ -287,16 +326,16 @@ get_all_projections(Sock, ProjType)
{ok, [projection()]} | {error, term()}.
get_all_projections(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
get_all_projections2(Sock, ProjType)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Get all epoch numbers from the FLU's projection store.
-spec list_all_projections(port(), projection_type()) ->
-spec list_all_projections(port_wrap(), projection_type()) ->
{ok, [non_neg_integer()]} | {error, term()}.
list_all_projections(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
@ -309,20 +348,37 @@ list_all_projections(Sock, ProjType)
{ok, [non_neg_integer()]} | {error, term()}.
list_all_projections(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
list_all_projections2(Sock, ProjType)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Quit &amp; close the connection to remote FLU.
-spec quit(port()) ->
-spec quit(port_wrap()) ->
ok.
quit(Sock) when is_port(Sock) ->
catch (_ = gen_tcp:send(Sock, <<"QUIT\n">>)),
catch gen_tcp:close(Sock),
quit(Sock) ->
catch (_ = w_send(Sock, <<"QUIT\n">>)),
disconnect(Sock),
ok.
connected_p({w,tcp,Sock}) ->
case (catch inet:peername(Sock)) of
{ok, _} -> true;
_ -> false
end;
connected_p(_) ->
false.
connect(#p_srvr{}=P) ->
w_connect(P).
disconnect({w,tcp,_Sock}=WS) ->
w_close(WS),
ok;
disconnect(_) ->
ok.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -330,7 +386,7 @@ quit(Sock) when is_port(Sock) ->
%% @doc Restricted API: Write a chunk of already-sequenced data to
%% `File' at `Offset'.
-spec write_chunk(port(), epoch_id(), file_name(), file_offset(), chunk()) ->
-spec write_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk()) ->
ok | {error, error_general()} | {error, term()}.
write_chunk(Sock, EpochID, File, Offset, Chunk)
when Offset >= ?MINIMUM_OFFSET ->
@ -344,19 +400,19 @@ write_chunk(Sock, EpochID, File, Offset, Chunk)
ok | {error, error_general()} | {error, term()}.
write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk)
when Offset >= ?MINIMUM_OFFSET ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
write_chunk2(Sock, EpochID, File, Offset, Chunk)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Restricted API: Delete a file after it has been successfully
%% migrated.
-spec delete_migration(port(), epoch_id(), file_name()) ->
-spec delete_migration(port_wrap(), epoch_id(), file_name()) ->
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
delete_migration(Sock, EpochID, File) when is_port(Sock) ->
delete_migration(Sock, EpochID, File) ->
delete_migration2(Sock, EpochID, File).
%% @doc Restricted API: Delete a file after it has been successfully
@ -365,19 +421,19 @@ delete_migration(Sock, EpochID, File) when is_port(Sock) ->
-spec delete_migration(inet_host(), inet_port(), epoch_id(), file_name()) ->
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
delete_migration(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
delete_migration2(Sock, EpochID, File)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Restricted API: Truncate a file after it has been successfully
%% erasure coded.
-spec trunc_hack(port(), epoch_id(), file_name()) ->
-spec trunc_hack(port_wrap(), epoch_id(), file_name()) ->
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
trunc_hack(Sock, EpochID, File) when is_port(Sock) ->
trunc_hack(Sock, EpochID, File) ->
trunc_hack2(Sock, EpochID, File).
%% @doc Restricted API: Truncate a file after it has been successfully
@ -386,16 +442,16 @@ trunc_hack(Sock, EpochID, File) when is_port(Sock) ->
-spec trunc_hack(inet_host(), inet_port(), epoch_id(), file_name()) ->
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
trunc_hack2(Sock, EpochID, File)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
append_chunk2(Sock, EpochID, Prefix0, Chunk0) ->
append_chunk2(Sock, EpochID, Prefix0, Chunk0, ChunkExtra) ->
erase(bad_sock),
try
%% TODO: add client-side checksum to the server's protocol
@ -405,11 +461,13 @@ append_chunk2(Sock, EpochID, Prefix0, Chunk0) ->
Len = iolist_size(Chunk0),
true = (Len =< ?MAX_CHUNK_SIZE),
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
LenHex = machi_util:int_to_hexbin(Len, 32),
Cmd = [<<"A ">>, EpochIDRaw, LenHex, Prefix, 10],
ok = gen_tcp:send(Sock, [Cmd, Chunk]),
{ok, Line} = gen_tcp:recv(Sock, 0),
ExtraHex = machi_util:int_to_hexbin(ChunkExtra, 32),
Cmd = [<<"A ">>, EpochIDHex, LenHex, ExtraHex, Prefix, 10],
ok = w_send(Sock, [Cmd, Chunk]),
{ok, Line} = w_recv(Sock, 0),
PathLen = byte_size(Line) - 3 - 16 - 1 - 1,
case Line of
<<"OK ", OffsetHex:16/binary, " ",
@ -436,21 +494,22 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) ->
erase(bad_sock),
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
File = machi_util:make_binary(File0),
PrefixHex = machi_util:int_to_hexbin(Offset, 64),
SizeHex = machi_util:int_to_hexbin(Size, 32),
CmdLF = [$R, 32, EpochIDRaw, PrefixHex, SizeHex, File, 10],
ok = gen_tcp:send(Sock, CmdLF),
case gen_tcp:recv(Sock, 3) of
CmdLF = [$R, 32, EpochIDHex, PrefixHex, SizeHex, File, 10],
ok = w_send(Sock, CmdLF),
case w_recv(Sock, 3) of
{ok, <<"OK\n">>} ->
{ok, _Chunk}=Res = gen_tcp:recv(Sock, Size),
{ok, _Chunk}=Res = w_recv(Sock, Size),
Res;
{ok, Else} ->
{ok, OldOpts} = inet:getopts(Sock, [packet]),
ok = inet:setopts(Sock, [{packet, line}]),
{ok, Else2} = gen_tcp:recv(Sock, 0),
ok = inet:setopts(Sock, OldOpts),
{ok, OldOpts} = w_getopts(Sock, [packet]),
ok = w_setopts(Sock, [{packet, line}]),
{ok, Else2} = w_recv(Sock, 0),
ok = w_setopts(Sock, OldOpts),
case Else of
<<"ERA">> ->
{error, todo_erasure_coded}; %% escript_cc_parse_ec_info(Sock, Line, Else2);
@ -485,13 +544,14 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) ->
list2(Sock, EpochID) ->
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
ok = gen_tcp:send(Sock, [<<"L ">>, EpochIDRaw, <<"\n">>]),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
ok = w_send(Sock, [<<"L ">>, EpochIDHex, <<"\n">>]),
ok = w_setopts(Sock, [{packet, line}]),
case w_recv(Sock, 0) of
{ok, <<"OK\n">>} ->
Res = list3(gen_tcp:recv(Sock, 0), Sock),
ok = inet:setopts(Sock, [{packet, raw}]),
Res = list3(w_recv(Sock, 0), Sock),
ok = w_setopts(Sock, [{packet, raw}]),
{ok, Res};
{ok, <<"ERROR WEDGED\n">>} ->
{error, wedged};
@ -511,19 +571,19 @@ list3({ok, Line}, Sock) ->
FileLen = byte_size(Line) - 16 - 1 - 1,
<<SizeHex:16/binary, " ", File:FileLen/binary, _/binary>> = Line,
Size = machi_util:hexstr_to_int(SizeHex),
[{Size, File}|list3(gen_tcp:recv(Sock, 0), Sock)];
[{Size, File}|list3(w_recv(Sock, 0), Sock)];
list3(Else, _Sock) ->
throw({server_protocol_error, Else}).
wedge_status2(Sock) ->
try
ok = gen_tcp:send(Sock, [<<"WEDGE-STATUS\n">>]),
ok = inet:setopts(Sock, [{packet, line}]),
ok = w_send(Sock, [<<"WEDGE-STATUS\n">>]),
ok = w_setopts(Sock, [{packet, line}]),
{ok, <<"OK ",
BooleanHex:2/binary, " ",
EpochHex:8/binary, " ",
CSumHex:40/binary, "\n">>} = gen_tcp:recv(Sock, 0),
ok = inet:setopts(Sock, [{packet, raw}]),
CSumHex:40/binary, "\n">>} = w_recv(Sock, 0),
ok = w_setopts(Sock, [{packet, raw}]),
Boolean = if BooleanHex == <<"00">> -> false;
BooleanHex == <<"01">> -> true
end,
@ -541,16 +601,17 @@ checksum_list2(Sock, EpochID, File) ->
erase(bad_sock),
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
ok = gen_tcp:send(Sock, [<<"C ">>, EpochIDRaw, File, <<"\n">>]),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
ok = w_send(Sock, [<<"C ">>, EpochIDHex, File, <<"\n">>]),
ok = w_setopts(Sock, [{packet, line}]),
case w_recv(Sock, 0) of
{ok, <<"OK ", Rest/binary>> = Line} ->
put(status, ok), % may be unset later
RestLen = byte_size(Rest) - 1,
<<LenHex:RestLen/binary, _:1/binary>> = Rest,
<<Len:64/big>> = machi_util:hexstr_to_bin(LenHex),
ok = inet:setopts(Sock, [{packet, raw}]),
ok = w_setopts(Sock, [{packet, raw}]),
{ok, checksum_list_finish(checksum_list_fast(Sock, Len))};
{ok, <<"ERROR NO-SUCH-FILE", _/binary>>} ->
{error, no_such_file};
@ -559,7 +620,11 @@ checksum_list2(Sock, EpochID, File) ->
{ok, <<"ERROR WEDGED", _/binary>>} ->
{error, wedged};
{ok, Else} ->
throw({server_protocol_error, Else})
throw({server_protocol_error, Else});
{error, closed} ->
throw({error, closed});
Else ->
throw(Else)
end
catch
throw:Error ->
@ -571,11 +636,11 @@ checksum_list2(Sock, EpochID, File) ->
end.
checksum_list_fast(Sock, 0) ->
{ok, <<".\n">> = _Line} = gen_tcp:recv(Sock, 2),
{ok, <<".\n">> = _Line} = w_recv(Sock, 2),
[];
checksum_list_fast(Sock, Remaining) ->
Num = erlang:min(Remaining, 1024*1024),
{ok, Bytes} = gen_tcp:recv(Sock, Num),
{ok, Bytes} = w_recv(Sock, Num),
[Bytes|checksum_list_fast(Sock, Remaining - byte_size(Bytes))].
checksum_list_finish(Chunks) ->
@ -599,7 +664,8 @@ write_chunk2(Sock, EpochID, File0, Offset, Chunk0) ->
erase(bad_sock),
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
%% TODO: add client-side checksum to the server's protocol
%% _ = machi_util:checksum_chunk(Chunk),
File = machi_util:make_binary(File0),
@ -609,10 +675,10 @@ write_chunk2(Sock, EpochID, File0, Offset, Chunk0) ->
Len = iolist_size(Chunk0),
true = (Len =< ?MAX_CHUNK_SIZE),
LenHex = machi_util:int_to_hexbin(Len, 32),
Cmd = [<<"W-repl ">>, EpochIDRaw, OffsetHex,
Cmd = [<<"W-repl ">>, EpochIDHex, OffsetHex,
LenHex, File, <<"\n">>],
ok = gen_tcp:send(Sock, [Cmd, Chunk]),
{ok, Line} = gen_tcp:recv(Sock, 0),
ok = w_send(Sock, [Cmd, Chunk]),
{ok, Line} = w_recv(Sock, 0),
PathLen = byte_size(Line) - 3 - 16 - 1 - 1,
case Line of
<<"OK\n">> ->
@ -637,11 +703,12 @@ delete_migration2(Sock, EpochID, File) ->
erase(bad_sock),
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
Cmd = [<<"DEL-migration ">>, EpochIDRaw, File, <<"\n">>],
ok = gen_tcp:send(Sock, Cmd),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
Cmd = [<<"DEL-migration ">>, EpochIDHex, File, <<"\n">>],
ok = w_send(Sock, Cmd),
ok = w_setopts(Sock, [{packet, line}]),
case w_recv(Sock, 0) of
{ok, <<"OK\n">>} ->
ok;
{ok, <<"ERROR NO-SUCH-FILE", _/binary>>} ->
@ -666,11 +733,12 @@ trunc_hack2(Sock, EpochID, File) ->
erase(bad_sock),
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
Cmd = [<<"TRUNC-hack--- ">>, EpochIDRaw, File, <<"\n">>],
ok = gen_tcp:send(Sock, Cmd),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
Cmd = [<<"TRUNC-hack--- ">>, EpochIDHex, File, <<"\n">>],
ok = w_send(Sock, Cmd),
ok = w_setopts(Sock, [{packet, line}]),
case w_recv(Sock, 0) of
{ok, <<"OK\n">>} ->
ok;
{ok, <<"ERROR NO-SUCH-FILE", _/binary>>} ->
@ -723,18 +791,22 @@ do_projection_common(Sock, ProjCmd) ->
true = (Len =< ?MAX_CHUNK_SIZE),
LenHex = machi_util:int_to_hexbin(Len, 32),
Cmd = [<<"PROJ ">>, LenHex, <<"\n">>],
ok = gen_tcp:send(Sock, [Cmd, ProjCmdBin]),
ok = inet:setopts(Sock, [{packet, line}]),
{ok, Line} = gen_tcp:recv(Sock, 0),
case Line of
<<"OK ", ResLenHex:8/binary, "\n">> ->
ResLen = machi_util:hexstr_to_int(ResLenHex),
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, ResBin} = gen_tcp:recv(Sock, ResLen),
ok = inet:setopts(Sock, [{packet, line}]),
binary_to_term(ResBin);
Else ->
{error, Else}
ok = w_send(Sock, [Cmd, ProjCmdBin]),
ok = w_setopts(Sock, [{packet, line}]),
case w_recv(Sock, 0) of
{ok, Line} ->
case Line of
<<"OK ", ResLenHex:8/binary, "\n">> ->
ResLen = machi_util:hexstr_to_int(ResLenHex),
ok = w_setopts(Sock, [{packet, raw}]),
{ok, ResBin} = w_recv(Sock, ResLen),
ok = w_setopts(Sock, [{packet, line}]),
binary_to_term(ResBin);
Else ->
{error, Else}
end;
{error, _} = Bad ->
throw(Bad)
end
catch
throw:Error ->
@ -744,3 +816,43 @@ do_projection_common(Sock, ProjCmd) ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
w_connect(#p_srvr{proto_mod=?MODULE, address=Host, port=Port, props=Props})->
try
case proplists:get_value(session_proto, Props, tcp) of
tcp ->
Sock = machi_util:connect(Host, Port),
{w,tcp,Sock};
%% sctp ->
%% %% TODO: not implemented
%% {w,sctp,Sock}
ssl ->
%% TODO: veryveryuntested
SslOptions = proplists:get_value(ssl_options, Props),
Sock = machi_util:connect(Port, Port),
{ok, SslSock} = ssl:connect(Sock, SslOptions),
{w,ssl,SslSock}
end
catch
_:_ ->
undefined
end.
w_close({w,tcp,Sock}) ->
catch gen_tcp:close(Sock),
ok.
w_recv({w,tcp,Sock}, Amt) ->
gen_tcp:recv(Sock, Amt).
w_send({w,tcp,Sock}, IoData) ->
gen_tcp:send(Sock, IoData).
w_getopts({w,tcp,Sock}, Opts) ->
inet:getopts(Sock, Opts).
w_setopts({w,tcp,Sock}, Opts) ->
inet:setopts(Sock, Opts).

View file

@ -59,6 +59,11 @@ new(EpochNum, MyName, [_|_] = MembersDict0, Down_list, UPI_list, Repairing_list,
is_list(Repairing_list), is_list(Dbg), is_list(Dbg2) ->
MembersDict = make_members_dict(MembersDict0),
All_list = [Name || {Name, _P} <- MembersDict],
if length(All_list) =< ?MAX_CHAIN_LENGTH ->
ok;
true ->
exit(max_chain_length_error)
end,
true = lists:all(fun(X) when is_atom(X) orelse is_binary(X) -> true;
(_) -> false
end, All_list),

View file

@ -50,12 +50,13 @@
get_all_projections/2, get_all_projections/3,
list_all_projections/2, list_all_projections/3
]).
-export([set_wedge_notify_pid/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(NO_EPOCH, {-1,<<0:(20*8)/big>>}).
-define(NO_EPOCH, ?DUMMY_PV1_EPOCH).
-record(state, {
public_dir = "" :: string(),
@ -148,6 +149,9 @@ list_all_projections(PidSpec, ProjType, Timeout)
when ProjType == 'public' orelse ProjType == 'private' ->
g_call(PidSpec, {list_all_projections, ProjType}, Timeout).
set_wedge_notify_pid(PidSpec, NotifyWedgeStateChanges) ->
gen_server:call(PidSpec, {set_wedge_notify_pid, NotifyWedgeStateChanges}).
%%%%%%%%%%%%%%%%%%%%%%%%%%%
g_call(PidSpec, Arg, Timeout) ->
@ -207,6 +211,8 @@ handle_call({{list_all_projections, ProjType}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
Dir = pick_path(ProjType, S),
{reply, {{ok, find_all(Dir)}, LC2}, S};
handle_call({set_wedge_notify_pid, NotifyWedgeStateChanges}, _From, S) ->
{reply, ok, S#state{wedge_notify_pid=NotifyWedgeStateChanges}};
handle_call(_Request, _From, S) ->
Reply = whaaaaaaaaaaaaa,
{reply, Reply, S}.
@ -258,7 +264,8 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) ->
ok = file:close(FH),
EffectiveProj = machi_chain_manager1:inner_projection_or_self(Proj),
EffectiveEpoch = EffectiveProj#projection_v1.epoch_number,
EpochId = {EffectiveEpoch, EffectiveProj#projection_v1.epoch_csum},
EpochId = {Epoch, Proj#projection_v1.epoch_csum},
EffectiveEpochId = {EffectiveEpoch, EffectiveProj#projection_v1.epoch_csum},
%%
NewS = if ProjType == public,
Epoch > element(1, S#state.max_public_epoch) ->
@ -266,7 +273,8 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) ->
%% This is a regular projection, i.e.,
%% does not have an inner proj.
update_wedge_state(
S#state.wedge_notify_pid, true, EpochId);
S#state.wedge_notify_pid, true,
EffectiveEpochId);
Epoch /= EffectiveEpoch ->
%% This projection has an inner proj.
%% The outer proj is flapping, so we do
@ -277,7 +285,8 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) ->
ProjType == private,
Epoch > element(1, S#state.max_private_epoch) ->
update_wedge_state(
S#state.wedge_notify_pid, false, EpochId),
S#state.wedge_notify_pid, false,
EffectiveEpochId),
S#state{max_private_epoch=EpochId};
true ->
S

View file

@ -51,12 +51,14 @@
-export([
%% File API
append_chunk/4, append_chunk/5,
append_chunk_extra/5, append_chunk_extra/6,
read_chunk/5, read_chunk/6,
checksum_list/3, checksum_list/4,
list_files/2, list_files/3,
wedge_status/1, wedge_status/2,
%% %% Projection API
get_epoch_id/1, get_epoch_id/2,
get_latest_epoch/2, get_latest_epoch/3,
read_latest_projection/2, read_latest_projection/3,
read_projection/3, read_projection/4,
@ -65,15 +67,16 @@
list_all_projections/2, list_all_projections/3,
%% Common API
quit/1
quit/1,
%% Internal API
write_chunk/5, write_chunk/6
]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(FLU_C, machi_flu1_client).
-record(state, {
i :: #p_srvr{},
sock :: 'undefined' | port()
@ -99,6 +102,21 @@ append_chunk(PidSpec, EpochID, Prefix, Chunk, Timeout) ->
gen_server:call(PidSpec, {req, {append_chunk, EpochID, Prefix, Chunk}},
Timeout).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
append_chunk_extra(PidSpec, EpochID, Prefix, Chunk, ChunkExtra)
when is_integer(ChunkExtra), ChunkExtra >= 0 ->
append_chunk_extra(PidSpec, EpochID, Prefix, Chunk, ChunkExtra, infinity).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
append_chunk_extra(PidSpec, EpochID, Prefix, Chunk, ChunkExtra, Timeout) ->
gen_server:call(PidSpec, {req, {append_chunk_extra, EpochID, Prefix,
Chunk, ChunkExtra}},
Timeout).
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
read_chunk(PidSpec, EpochID, File, Offset, Size) ->
@ -143,6 +161,16 @@ wedge_status(PidSpec, Timeout) ->
gen_server:call(PidSpec, {req, {wedge_status}},
Timeout).
%% @doc Get the `epoch_id()' of the FLU's current/latest projection.
get_epoch_id(PidSpec) ->
get_epoch_id(PidSpec, infinity).
%% @doc Get the `epoch_id()' of the FLU's current/latest projection.
get_epoch_id(PidSpec, Timeout) ->
gen_server:call(PidSpec, {req, {get_epoch_id}}, Timeout).
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
get_latest_epoch(PidSpec, ProjType) ->
@ -215,6 +243,19 @@ list_all_projections(PidSpec, ProjType, Timeout) ->
quit(PidSpec) ->
gen_server:call(PidSpec, quit, infinity).
%% @doc Write a chunk (binary- or iolist-style) of data to a file
%% with `Prefix' at `Offset'.
write_chunk(PidSpec, EpochID, File, Offset, Chunk) ->
write_chunk(PidSpec, EpochID, File, Offset, Chunk, infinity).
%% @doc Write a chunk (binary- or iolist-style) of data to a file
%% with `Prefix' at `Offset'.
write_chunk(PidSpec, EpochID, File, Offset, Chunk, Timeout) ->
gen_server:call(PidSpec, {req, {write_chunk, EpochID, File, Offset, Chunk}},
Timeout).
%%%%%%%%%%%%%%%%%%%%%%%%%%%
init([I]) ->
@ -265,59 +306,74 @@ do_req(Req, S) ->
{{error, partition}, S2}
end.
make_req_fun({append_chunk, EpochID, Prefix, Chunk}, #state{sock=Sock}) ->
fun() -> ?FLU_C:append_chunk(Sock, EpochID, Prefix, Chunk) end;
make_req_fun({read_chunk, EpochID, File, Offset, Size}, #state{sock=Sock}) ->
fun() -> ?FLU_C:read_chunk(Sock, EpochID, File, Offset, Size) end;
make_req_fun({checksum_list, EpochID, File}, #state{sock=Sock}) ->
fun() -> ?FLU_C:checksum_list(Sock, EpochID, File) end;
make_req_fun({list_files, EpochID}, #state{sock=Sock}) ->
fun() -> ?FLU_C:list_files(Sock, EpochID) end;
make_req_fun({wedge_status}, #state{sock=Sock}) ->
fun() -> ?FLU_C:wedge_status(Sock) end;
make_req_fun({get_latest_epoch, ProjType}, #state{sock=Sock}) ->
fun() -> ?FLU_C:get_latest_epoch(Sock, ProjType) end;
make_req_fun({read_latest_projection, ProjType}, #state{sock=Sock}) ->
fun() -> ?FLU_C:read_latest_projection(Sock, ProjType) end;
make_req_fun({read_projection, ProjType, Epoch}, #state{sock=Sock}) ->
fun() -> ?FLU_C:read_projection(Sock, ProjType, Epoch) end;
make_req_fun({write_projection, ProjType, Proj}, #state{sock=Sock}) ->
fun() -> ?FLU_C:write_projection(Sock, ProjType, Proj) end;
make_req_fun({get_all_projections, ProjType}, #state{sock=Sock}) ->
fun() -> ?FLU_C:get_all_projections(Sock, ProjType) end;
make_req_fun({list_all_projections, ProjType}, #state{sock=Sock}) ->
fun() -> ?FLU_C:list_all_projections(Sock, ProjType) end.
make_req_fun({append_chunk, EpochID, Prefix, Chunk},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:append_chunk(Sock, EpochID, Prefix, Chunk) end;
make_req_fun({append_chunk_extra, EpochID, Prefix, Chunk, ChunkExtra},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra) end;
make_req_fun({read_chunk, EpochID, File, Offset, Size},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:read_chunk(Sock, EpochID, File, Offset, Size) end;
make_req_fun({write_chunk, EpochID, File, Offset, Chunk},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:write_chunk(Sock, EpochID, File, Offset, Chunk) end;
make_req_fun({checksum_list, EpochID, File},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:checksum_list(Sock, EpochID, File) end;
make_req_fun({list_files, EpochID},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:list_files(Sock, EpochID) end;
make_req_fun({wedge_status},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:wedge_status(Sock) end;
make_req_fun({get_epoch_id},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> case Mod:read_latest_projection(Sock, private) of
{ok, P} ->
#projection_v1{epoch_number=Epoch,
epoch_csum=CSum} =
machi_chain_manager1:inner_projection_or_self(P),
{ok, {Epoch, CSum}};
Error ->
Error
end
end;
make_req_fun({get_latest_epoch, ProjType},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:get_latest_epoch(Sock, ProjType) end;
make_req_fun({read_latest_projection, ProjType},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:read_latest_projection(Sock, ProjType) end;
make_req_fun({read_projection, ProjType, Epoch},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:read_projection(Sock, ProjType, Epoch) end;
make_req_fun({write_projection, ProjType, Proj},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:write_projection(Sock, ProjType, Proj) end;
make_req_fun({get_all_projections, ProjType},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:get_all_projections(Sock, ProjType) end;
make_req_fun({list_all_projections, ProjType},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:list_all_projections(Sock, ProjType) end.
connected_p(#state{sock=SockMaybe,
i=#p_srvr{proto=ipv4}=_I}=_S) ->
is_port(SockMaybe);
connected_p(#state{i=#p_srvr{proto=disterl,
name=_NodeName}=_I}=_S) ->
true.
%% case net_adm:ping(NodeName) of
%% ping ->
%% true;
%% _ ->
%% false
%% end.
i=#p_srvr{proto_mod=Mod}=_I}=_S) ->
Mod:connected_p(SockMaybe).
try_connect(#state{sock=undefined,
i=#p_srvr{proto=ipv4, address=Host, port=TcpPort}=_I}=S) ->
try
Sock = machi_util:connect(Host, TcpPort),
S#state{sock=Sock}
catch
_:_ ->
S
end;
i=#p_srvr{proto_mod=Mod}=P}=S) ->
Sock = Mod:connect(P),
S#state{sock=Sock};
try_connect(S) ->
%% If we're connection-based, we're already connected.
%% If we're not connection-based, then there's nothing to do.
S.
disconnect(#state{sock=undefined}=S) ->
S;
disconnect(#state{sock=Sock,
i=#p_srvr{proto=ipv4}=_I}=S) ->
(catch gen_tcp:close(Sock)),
S#state{sock=undefined};
disconnect(S) ->
S.
i=#p_srvr{proto_mod=Mod}=_I}=S) ->
Mod:disconnect(Sock),
S#state{sock=undefined}.

617
src/machi_yessir_client.erl Normal file
View file

@ -0,0 +1,617 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc "Yes, sir!" style dummy/mock client facade.
-module(machi_yessir_client).
-include("machi.hrl").
-include("machi_projection.hrl").
-export([
%% File API
append_chunk/4, append_chunk/5,
append_chunk_extra/5, append_chunk_extra/6,
read_chunk/5, read_chunk/6,
checksum_list/3, checksum_list/4,
list_files/2, list_files/3,
wedge_status/1, wedge_status/2,
%% Projection API
get_latest_epoch/2, get_latest_epoch/3,
read_latest_projection/2, read_latest_projection/3,
read_projection/3, read_projection/4,
write_projection/3, write_projection/4,
get_all_projections/2, get_all_projections/3,
list_all_projections/2, list_all_projections/3,
%% Common API
quit/1,
%% Connection management API
connected_p/1, connect/1, disconnect/1
]).
%% For "internal" replication only.
-export([
write_chunk/5, write_chunk/6,
delete_migration/3, delete_migration/4,
trunc_hack/3, trunc_hack/4
]).
-record(yessir, {
name,
start,
start_bin,
num_files,
file_size,
chunk_size
}).
-type chunk() :: binary() | iolist(). % client can use either
-type chunk_csum() :: {file_offset(), chunk_size(), binary()}.
-type chunk_s() :: binary(). % server always uses binary()
-type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}.
-type chunk_size() :: non_neg_integer().
-type error_general() :: 'bad_arg' | 'wedged'.
-type epoch_csum() :: binary().
-type epoch_num() :: -1 | non_neg_integer().
-type epoch_id() :: {epoch_num(), epoch_csum()}.
-type file_info() :: {file_size(), file_name_s()}.
-type file_name() :: binary() | list().
-type file_name_s() :: binary(). % server reply
-type file_offset() :: non_neg_integer().
-type file_size() :: non_neg_integer().
-type file_prefix() :: binary() | list().
-type inet_host() :: inet:ip_address() | inet:hostname().
-type inet_port() :: inet:port_number().
-type port_wrap() :: {w,atom(),term()}.
-type projection() :: #projection_v1{}.
-type projection_type() :: 'public' | 'private'.
-export_type([epoch_id/0]).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
-spec append_chunk(port_wrap(), epoch_id(), file_prefix(), chunk()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
append_chunk(Sock, EpochID, Prefix, Chunk) ->
append_chunk_extra(Sock, EpochID, Prefix, Chunk, 0).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
-spec append_chunk(inet_host(), inet_port(),
epoch_id(), file_prefix(), chunk()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
append_chunk(_Host, _TcpPort, EpochID, Prefix, Chunk) ->
Sock = connect(#p_srvr{proto_mod=?MODULE}),
try
append_chunk_extra(Sock, EpochID, Prefix, Chunk, 0)
after
disconnect(Sock)
end.
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix' and also request an additional `Extra' bytes.
%%
%% For example, if the `Chunk' size is 1 KByte and `Extra' is 4K Bytes, then
%% the file offsets that follow `Chunk''s position for the following 4K will
%% be reserved by the file sequencer for later write(s) by the
%% `write_chunk()' API.
-spec append_chunk_extra(port_wrap(), epoch_id(), file_prefix(), chunk(), chunk_size()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
append_chunk_extra(#yessir{name=Name,start_bin=StartBin},
_EpochID, Prefix, Chunk, ChunkExtra)
when is_integer(ChunkExtra), ChunkExtra >= 0 ->
File = list_to_binary([Prefix, $/, StartBin]),
Pos = case get({Name,offset,File}) of
undefined -> ?MINIMUM_OFFSET;
N -> N
end,
put({Name,offset,File}, Pos + size(Chunk) + ChunkExtra),
{ok, {File, Pos}}.
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix' and also request an additional `Extra' bytes.
%%
%% For example, if the `Chunk' size is 1 KByte and `Extra' is 4K Bytes, then
%% the file offsets that follow `Chunk''s position for the following 4K will
%% be reserved by the file sequencer for later write(s) by the
%% `write_chunk()' API.
-spec append_chunk_extra(inet_host(), inet_port(),
epoch_id(), file_prefix(), chunk(), chunk_size()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
append_chunk_extra(_Host, _TcpPort, EpochID, Prefix, Chunk, ChunkExtra)
when is_integer(ChunkExtra), ChunkExtra >= 0 ->
Sock = connect(#p_srvr{proto_mod=?MODULE}),
try
append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra)
after
disconnect(Sock)
end.
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
-spec read_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk_size()) ->
{ok, chunk_s()} |
{error, error_general() | 'no_such_file' | 'partial_read'} |
{error, term()}.
read_chunk(#yessir{name=Name}, _EpochID, File, Offset, Size)
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
case get({Name,offset,File}) of
undefined ->
{error, no_such_file};
MaxOffset ->
if Offset > MaxOffset ->
{error, not_written};
%% To be more accurate, we ought to include this clause.
%% But checksum_list() is a bit dumb and can list a byte
%% range that overlaps the end of the file.
%% Offset + Size > MaxOffset ->
%% {error, partial_read};
true ->
Chunk = make_chunk(Name, Size),
{ok, Chunk}
end
end.
make_chunk(Name, Size) ->
case get({Name,chunk,Size}) of
undefined ->
Byte = Size rem 253,
C = list_to_binary(
lists:duplicate(Size, Byte)),
put({Name,chunk,Size}, C),
C;
C ->
C
end.
make_csum(Name,Size) ->
case get({Name,csum,Size}) of
undefined ->
C = crypto:hash(sha, make_chunk(Name, Size)),
put({Name,csum,Size}, C),
C;
C ->
C
end.
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
-spec read_chunk(inet_host(), inet_port(), epoch_id(),
file_name(), file_offset(), chunk_size()) ->
{ok, chunk_s()} |
{error, error_general() | 'no_such_file' | 'partial_read'} |
{error, term()}.
read_chunk(_Host, _TcpPort, EpochID, File, Offset, Size)
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
Sock = connect(#p_srvr{proto_mod=?MODULE}),
try
read_chunk(Sock, EpochID, File, Offset, Size)
after
disconnect(Sock)
end.
%% @doc Fetch the list of chunk checksums for `File'.
-spec checksum_list(port_wrap(), epoch_id(), file_name()) ->
{ok, [chunk_csum()]} |
{error, error_general() | 'no_such_file' | 'partial_read'} |
{error, term()}.
checksum_list(#yessir{name=Name,chunk_size=ChunkSize}, _EpochID, File) ->
case get({Name,offset,File}) of
undefined ->
{error, no_such_file};
MaxOffset ->
CSum = make_csum(Name, ChunkSize),
Cs = [{Offset, ChunkSize, CSum} ||
Offset <- lists:seq(?MINIMUM_OFFSET, MaxOffset, ChunkSize)],
{ok, Cs}
end.
%% @doc Fetch the list of chunk checksums for `File'.
-spec checksum_list(inet_host(), inet_port(), epoch_id(), file_name()) ->
{ok, [chunk_csum()]} |
{error, error_general() | 'no_such_file'} | {error, term()}.
checksum_list(_Host, _TcpPort, EpochID, File) ->
Sock = connect(#p_srvr{proto_mod=?MODULE}),
try
checksum_list(Sock, EpochID, File)
after
disconnect(Sock)
end.
%% @doc Fetch the list of all files on the remote FLU.
-spec list_files(port_wrap(), epoch_id()) ->
{ok, [file_info()]} | {error, term()}.
list_files(#yessir{name=Name}, _EpochID) ->
Files = [{Offset, File} || {{N,offset,File}, Offset} <- get(), N == Name],
{ok, Files}.
%% @doc Fetch the list of all files on the remote FLU.
-spec list_files(inet_host(), inet_port(), epoch_id()) ->
{ok, [file_info()]} | {error, term()}.
list_files(_Host, _TcpPort, EpochID) ->
Sock = connect(#p_srvr{proto_mod=?MODULE}),
try
list_files(Sock, EpochID)
after
disconnect(Sock)
end.
%% @doc Fetch the wedge status from the remote FLU.
-spec wedge_status(port_wrap()) ->
{ok, {boolean(), pv1_epoch()}} | {error, term()}.
wedge_status(_Sock) ->
{ok, {false, ?DUMMY_PV1_EPOCH}}.
%% @doc Fetch the wedge status from the remote FLU.
-spec wedge_status(inet_host(), inet_port()) ->
{ok, {boolean(), pv1_epoch()}} | {error, term()}.
wedge_status(_Host, _TcpPort) ->
Sock = connect(#p_srvr{proto_mod=?MODULE}),
try
wedge_status(Sock)
after
disconnect(Sock)
end.
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
-spec get_latest_epoch(port_wrap(), projection_type()) ->
{ok, epoch_id()} | {error, term()}.
get_latest_epoch(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
case read_latest_projection(Sock, ProjType) of
{ok, P} ->
{ok, {P#projection_v1.epoch_number, P#projection_v1.epoch_csum}};
_ ->
{ok, {0, <<"no such checksum">>}}
end.
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
-spec get_latest_epoch(inet_host(), inet_port(),
projection_type()) ->
{ok, epoch_id()} | {error, term()}.
get_latest_epoch(_Host, _TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = connect(#p_srvr{proto_mod=?MODULE}),
try
get_latest_epoch(Sock, ProjType)
after
disconnect(Sock)
end.
%% @doc Get the latest projection from the FLU's projection store for `ProjType'
-spec read_latest_projection(port_wrap(), projection_type()) ->
{ok, projection()} | {error, not_written} | {error, term()}.
read_latest_projection(#yessir{name=Name}, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Ps = [P || {{N,proj,PT,_Epoch}, P} <- get(), N == Name, PT == ProjType],
case (catch lists:last(lists:sort(Ps))) of
P when is_record(P, projection_v1) ->
{ok, P};
_ ->
{ok, #projection_v1{epoch_number=0,epoch_csum= <<"yo">>,
author_server=zzya,
all_members=[],creation_time=now(),upi=[],
repairing=[],down=[],dbg=[],dbg2=[],
members_dict=[]}}
end.
%% @doc Get the latest projection from the FLU's projection store for `ProjType'
-spec read_latest_projection(inet_host(), inet_port(),
projection_type()) ->
{ok, projection()} | {error, not_written} | {error, term()}.
read_latest_projection(_Host, _TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = connect(#p_srvr{proto_mod=?MODULE}),
try
read_latest_projection(Sock, ProjType)
after
disconnect(Sock)
end.
%% @doc Read a projection `Proj' of type `ProjType'.
-spec read_projection(port_wrap(), projection_type(), epoch_num()) ->
{ok, projection()} | {error, not_written} | {error, term()}.
read_projection(#yessir{name=Name}, ProjType, Epoch)
when ProjType == 'public' orelse ProjType == 'private' ->
case get({Name,proj,ProjType,Epoch}) of
undefined ->
{error, not_written};
P when is_record(P, projection_v1) ->
{ok, P}
end.
%% @doc Read a projection `Proj' of type `ProjType'.
-spec read_projection(inet_host(), inet_port(),
projection_type(), epoch_num()) ->
{ok, projection()} | {error, written} | {error, term()}.
read_projection(_Host, _TcpPort, ProjType, Epoch)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = connect(#p_srvr{proto_mod=?MODULE}),
try
read_projection(Sock, ProjType, Epoch)
after
disconnect(Sock)
end.
%% @doc Write a projection `Proj' of type `ProjType'.
-spec write_projection(port_wrap(), projection_type(), projection()) ->
'ok' | {error, written} | {error, term()}.
write_projection(#yessir{name=Name}=Sock, ProjType, Proj)
when ProjType == 'public' orelse ProjType == 'private',
is_record(Proj, projection_v1) ->
Epoch = Proj#projection_v1.epoch_number,
case read_projection(Sock, ProjType, Epoch) of
{error, not_written} ->
put({Name,proj,ProjType,Epoch}, Proj),
ok;
{ok, _} ->
{error, written}
end.
%% @doc Write a projection `Proj' of type `ProjType'.
-spec write_projection(inet_host(), inet_port(),
projection_type(), projection()) ->
'ok' | {error, written} | {error, term()}.
write_projection(_Host, _TcpPort, ProjType, Proj)
when ProjType == 'public' orelse ProjType == 'private',
is_record(Proj, projection_v1) ->
Sock = connect(#p_srvr{proto_mod=?MODULE}),
try
write_projection(Sock, ProjType, Proj)
after
disconnect(Sock)
end.
%% @doc Get all projections from the FLU's projection store.
-spec get_all_projections(port_wrap(), projection_type()) ->
{ok, [projection()]} | {error, term()}.
get_all_projections(#yessir{name=Name}, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Ps = [Proj || {{N,proj,PT,_}, Proj} <- get(), N == Name, PT == ProjType],
{ok, lists:sort(Ps)}.
%% @doc Get all projections from the FLU's projection store.
-spec get_all_projections(inet_host(), inet_port(),
projection_type()) ->
{ok, [projection()]} | {error, term()}.
get_all_projections(_Host, _TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = connect(#p_srvr{proto_mod=?MODULE}),
try
get_all_projections(Sock, ProjType)
after
disconnect(Sock)
end.
%% @doc Get all epoch numbers from the FLU's projection store.
-spec list_all_projections(port_wrap(), projection_type()) ->
{ok, [non_neg_integer()]} | {error, term()}.
list_all_projections(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
case get_all_projections(Sock, ProjType) of
{ok, Ps} ->
{ok, [P#projection_v1.epoch_number || P <- Ps]};
_ ->
{error, not_written}
end.
%% @doc Get all epoch numbers from the FLU's projection store.
-spec list_all_projections(inet_host(), inet_port(),
projection_type()) ->
{ok, [non_neg_integer()]} | {error, term()}.
list_all_projections(_Host, _TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = connect(#p_srvr{proto_mod=?MODULE}),
try
list_all_projections(Sock, ProjType)
after
disconnect(Sock)
end.
%% @doc Quit &amp; close the connection to remote FLU.
-spec quit(port_wrap()) ->
ok.
quit(_) ->
ok.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% @doc Restricted API: Write a chunk of already-sequenced data to
%% `File' at `Offset'.
-spec write_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk()) ->
ok | {error, error_general()} | {error, term()}.
write_chunk(#yessir{name=Name}, _EpochID, File, Offset, Chunk)
when Offset >= ?MINIMUM_OFFSET ->
Pos = case get({Name,offset,File}) of
undefined -> Offset;
N -> erlang:max(N + size(Chunk), Offset)
end,
put({Name,offset,File}, Pos),
ok.
%% @doc Restricted API: Write a chunk of already-sequenced data to
%% `File' at `Offset'.
-spec write_chunk(inet_host(), inet_port(),
epoch_id(), file_name(), file_offset(), chunk()) ->
ok | {error, error_general()} | {error, term()}.
write_chunk(_Host, _TcpPort, EpochID, File, Offset, Chunk)
when Offset >= ?MINIMUM_OFFSET ->
Sock = connect(#p_srvr{proto_mod=?MODULE}),
try
write_chunk(Sock, EpochID, File, Offset, Chunk)
after
disconnect(Sock)
end.
%% @doc Restricted API: Delete a file after it has been successfully
%% migrated.
-spec delete_migration(port_wrap(), epoch_id(), file_name()) ->
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
delete_migration(#yessir{name=Name}, _EpochID, File) ->
case get({Name,offset,File}) of
undefined ->
{error, no_such_file};
_N ->
erase({name,offset,File}),
ok
end.
%% @doc Restricted API: Delete a file after it has been successfully
%% migrated.
-spec delete_migration(inet_host(), inet_port(), epoch_id(), file_name()) ->
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
delete_migration(_Host, _TcpPort, EpochID, File) ->
Sock = connect(#p_srvr{proto_mod=?MODULE}),
try
delete_migration(Sock, EpochID, File)
after
disconnect(Sock)
end.
%% @doc Restricted API: Truncate a file after it has been successfully
%% erasure coded.
-spec trunc_hack(port_wrap(), epoch_id(), file_name()) ->
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
trunc_hack(#yessir{name=Name}, _EpochID, File) ->
put({Name,offset,File}, ?MINIMUM_OFFSET).
%% @doc Restricted API: Truncate a file after it has been successfully
%% erasure coded.
-spec trunc_hack(inet_host(), inet_port(), epoch_id(), file_name()) ->
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
trunc_hack(_Host, _TcpPort, EpochID, File) ->
Sock = connect(#p_srvr{proto_mod=?MODULE}),
try
trunc_hack(Sock, EpochID, File)
after
disconnect(Sock)
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
connected_p(_) ->
true.
connect(#p_srvr{name=Name, props=Props})->
Now = os:timestamp(),
StartBin = proplists:get_value(file_suffix, Props,
list_to_binary(io_lib:format("~w", [Now]))),
NumFiles=proplists:get_value(num_files, Props, 12),
FileSize=proplists:get_value(file_size, Props, 50*1024*1024),
ChunkSize=proplists:get_value(chunk_size, Props, 64*1024),
Sock = #yessir{name=Name,
start=Now,
start_bin=StartBin,
num_files=NumFiles,
file_size=FileSize,
chunk_size=ChunkSize
},
%% Add fake dict entries for these files
[begin
Prefix = list_to_binary(io_lib:format("fake~w", [X])),
{ok, _} = append_chunk_extra(Sock, unused, Prefix, <<>>, FileSize)
end || X <- lists:seq(1, NumFiles)],
Sock.
disconnect(#yessir{name=Name}) ->
[erase(K) || {{N,offset,_}=K, _V} <- get(), N == Name],
[erase(K) || {{N,chunk,_}=K, _V} <- get(), N == Name],
[erase(K) || {{N,csum,_}=K, _V} <- get(), N == Name],
[erase(K) || {{N,proj,_,_}=K, _V} <- get(), N == Name],
ok.
%% Example use:
%% application:ensure_all_started(machi).
%% machi_flu_psup:start_flu_package(a, 4444, "./data.a", []).
%% D = [{a,{p_srvr,a,machi_yessir_client,x,x,
%% [{file_suffix,<<"yo">>},{num_files,7}]}},
%% {b,{p_srvr,b,machi_yessir_client,x,x,
%% [{file_suffix,<<"yo">>},{num_files,20}]}}].
%% machi_chain_manager1:set_chain_members(a_chmgr, D).
%% =INFO REPORT==== 17-May-2015::18:57:47 ===
%% Chain manager a found latest public projection 0 has author zzya not a member of our members list [a,b]. Please check chain membership on this rogue chain manager zzya.
%% ok
%% 5>
%% =INFO REPORT==== 17-May-2015::18:57:51 ===
%% Repair start: tail a of [a] -> [b], ap_mode ID {a,{1431,856671,140404}}
%% MissingFileSummary [{<<"fake1/yo">>,{52429824,[]}},
%% {<<"fake10/yo">>,{52429824,[a]}},
%% {<<"fake11/yo">>,{52429824,[a]}},
%% {<<"fake12/yo">>,{52429824,[a]}},
%% {<<"fake13/yo">>,{52429824,[a]}},
%% {<<"fake14/yo">>,{52429824,[a]}},
%% {<<"fake15/yo">>,{52429824,[a]}},
%% {<<"fake16/yo">>,{52429824,[a]}},
%% {<<"fake17/yo">>,{52429824,[a]}},
%% {<<"fake18/yo">>,{52429824,[a]}},
%% {<<"fake19/yo">>,{52429824,[a]}},
%% {<<"fake2/yo">>,{52429824,[]}},
%% {<<"fake20/yo">>,{52429824,[a]}},
%% {<<"fake3/yo">>,{52429824,[]}},
%% {<<"fake4/yo">>,{52429824,[]}},
%% {<<"fake5/yo">>,{52429824,[]}},
%% {<<"fake6/yo">>,{52429824,[]}},
%% {<<"fake7/yo">>,{52429824,[]}},
%% {<<"fake8/yo">>,{52429824,[a]}},
%% {<<"fake9/yo">>,{52429824,[a]}}]
%% Make repair directives: .................... done
%% Out-of-sync data for FLU a: 650.8 MBytes
%% Out-of-sync data for FLU b: 0.0 MBytes
%% Execute repair directives: ..................................................................................................................... done
%% =INFO REPORT==== 17-May-2015::18:57:52 ===
%% Repair success: tail a of [a] finished ap_mode repair ID {a,{1431,856671,140404}}: ok
%% Stats [{t_in_files,0},{t_in_chunks,10413},{t_in_bytes,682426368},{t_out_files,0},{t_out_chunks,10413},{t_out_bytes,682426368},{t_bad_chunks,0},{t_elapsed_seconds,1.591}]

View file

@ -38,7 +38,7 @@ verify_file_checksums_test() ->
W_props = [{initial_wedged, false}],
FLU1 = machi_flu1_test:setup_test_flu(verify1_flu, TcpPort, DataDir,
W_props),
Sock1 = machi_util:connect(Host, TcpPort),
Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}),
try
Prefix = <<"verify_prefix">>,
[{ok, _} = ?FLU_C:append_chunk(Sock1, ?DUMMY_PV1_EPOCH,

View file

@ -140,7 +140,7 @@ smoke0_test() ->
Host = "localhost",
TcpPort = 6623,
{ok, FLUa} = machi_flu1:start_link([{a,TcpPort,"./data.a"}]),
Pa = #p_srvr{name=a, proto=ipv4, address=Host, port=TcpPort},
Pa = #p_srvr{name=a, address=Host, port=TcpPort},
Members_Dict = machi_projection:make_members_dict([Pa]),
%% Egadz, more racing on startup, yay. TODO fix.
timer:sleep(1),

View file

@ -87,6 +87,23 @@ flu_smoke_test() ->
?DUMMY_PV1_EPOCH,
File1, Off1, Len1*984),
{ok, {Off1b,Len1b,File1b}} = ?FLU_C:append_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
Prefix, Chunk1),
Extra = 42,
{ok, {Off1c,Len1c,File1c}} = ?FLU_C:append_chunk_extra(Host, TcpPort,
?DUMMY_PV1_EPOCH,
Prefix, Chunk1, Extra),
{ok, {Off1d,Len1d,File1d}} = ?FLU_C:append_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
Prefix, Chunk1),
if File1b == File1c, File1c == File1d ->
true = (Off1c == Off1b + Len1b),
true = (Off1d == Off1c + Len1c + Extra);
true ->
exit(not_mandatory_but_test_expected_same_file_fixme)
end,
Chunk2 = <<"yo yo">>,
Len2 = byte_size(Chunk2),
Off2 = ?MINIMUM_OFFSET + 77,
@ -132,7 +149,7 @@ flu_projection_smoke_test() ->
FLU1 = setup_test_flu(projection_test_flu, TcpPort, DataDir),
try
[begin
{ok, {-1,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T),
{ok, {0,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T),
{error, not_written} =
?FLU_C:read_latest_projection(Host, TcpPort, T),
{ok, []} = ?FLU_C:list_all_projections(Host, TcpPort, T),

View file

@ -74,10 +74,11 @@ partial_stop_restart2() ->
Dict = orddict:from_list(Ps),
[os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps],
{ok, SupPid} = machi_flu_sup:start_link(),
DbgProps = [{dbg, [{initial_wedged, true}]}],
Start = fun({_,P}) ->
#p_srvr{name=Name, port=Port, props=Dir} = P,
{ok, _} = machi_flu_psup:start_flu_package(
Name, Port, Dir, [{active_mode,false}])
Name, Port, Dir, [{active_mode,false}|DbgProps])
end,
WedgeStatus = fun({_,#p_srvr{address=Addr, port=TcpPort}}) ->
machi_flu1_client:wedge_status(Addr, TcpPort)

View file

@ -39,10 +39,10 @@ api_smoke_test() ->
erase(flu_pid),
try
I = #p_srvr{name=RegName, proto=ipv4, address=Host, port=TcpPort},
I = #p_srvr{name=RegName, address=Host, port=TcpPort},
{ok, Prox1} = ?MUT:start_link(I),
try
FakeEpoch = {-1, <<0:(20*8)/big>>},
FakeEpoch = ?DUMMY_PV1_EPOCH,
[{ok, {_,_,_}} = ?MUT:append_chunk(Prox1,
FakeEpoch, <<"prefix">>, <<"data">>,
infinity) || _ <- lists:seq(1,5)],
@ -63,6 +63,11 @@ api_smoke_test() ->
?MUT:append_chunk(Prox1, FakeEpoch, <<"prefix">>, MyChunk,
infinity),
{ok, MyChunk} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize),
MyChunk2 = <<"my chunk data, yeah, again">>,
{ok, {MyOff2,MySize2,MyFile2}} =
?MUT:append_chunk_extra(Prox1, FakeEpoch, <<"prefix">>,
MyChunk2, 4242, infinity),
{ok, MyChunk2} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2),
%% Alright, now for the rest of the API, whee
BadFile = <<"no-such-file">>,