Add namespace info to wedge_status API call; add namespace enforcement @ machi_flu1_net_server

This commit is contained in:
Scott Lystig Fritchie 2015-12-31 14:34:15 +09:00
parent f09eef14eb
commit a3fc1c3d68
12 changed files with 99 additions and 67 deletions

View file

@ -46,7 +46,7 @@
-record(ns_info, {
version = 0 :: machi_dt:namespace_version(),
name = "" :: machi_dt:namespace(),
name = <<>> :: machi_dt:namespace(),
locator = 0 :: machi_dt:locator()
}).

View file

@ -507,6 +507,8 @@ message Mpb_LL_WedgeStatusResp {
required Mpb_GeneralStatusCode status = 1;
optional Mpb_EpochID epoch_id = 2;
optional bool wedged_flag = 3;
optional uint32 namespace_version = 4;
optional string namespace = 5;
}
// Low level API: delete_migration()

View file

@ -477,7 +477,7 @@ witnesses_use_our_epoch([FLU|RestFLUs],
Proxy = orddict:fetch(FLU, PD),
%% Check both that the EpochID is the same *and* not wedged!
case ?FLU_PC:wedge_status(Proxy, ?TIMEOUT) of
{ok, {false, EID}} when EID == EpochID ->
{ok, {false, EID,_,_}} when EID == EpochID ->
witnesses_use_our_epoch(RestFLUs, S);
_Else ->
false

View file

@ -44,7 +44,7 @@
-type inet_host() :: inet:ip_address() | inet:hostname().
-type inet_port() :: inet:port_number().
-type locator() :: number().
-type namespace() :: string().
-type namespace() :: binary().
-type namespace_version() :: non_neg_integer().
-type ns_info() :: #ns_info{}.
-type projection() :: #projection_v1{}.

View file

@ -243,7 +243,7 @@ list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) ->
%% @doc Fetch the wedge status from the remote FLU.
-spec wedge_status(port_wrap()) ->
{ok, {boolean(), machi_dt:epoch_id()}} | {error, term()}.
{ok, {boolean(), machi_dt:epoch_id(), machi_dt:namespace_version(),machi_dt:namespace()}} | {error, term()}.
wedge_status(Sock) ->
wedge_status2(Sock).
@ -251,7 +251,7 @@ wedge_status(Sock) ->
%% @doc Fetch the wedge status from the remote FLU.
-spec wedge_status(machi_dt:inet_host(), machi_dt:inet_port()) ->
{ok, {boolean(), machi_dt:epoch_id()}} | {error, term()}.
{ok, {boolean(), machi_dt:epoch_id(), machi_dt:namespace_version(),machi_dt:namespace()}} | {error, term()}.
wedge_status(Host, TcpPort) when is_integer(TcpPort) ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try

View file

@ -66,6 +66,10 @@
flu_name :: pv1_server(),
%% Used in server_wedge_status to lookup the table
epoch_tab :: ets:tab(),
%% Clustering: cluster map version number
namespace_version = 0 :: machi_dt:namespace_version(),
%% Clustering: my (and my chain's) assignment to a specific namespace
namespace = <<"">> :: machi_dt:namespace(),
%% High mode only
high_clnt :: pid(),
@ -228,6 +232,8 @@ do_pb_ll_request(PB_request, S) ->
end,
{machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}.
%% do_pb_ll_request2(): Verification of epoch details & namespace details.
do_pb_ll_request2(NSVersion, NS, EpochID, CMD, S) ->
{Wedged_p, CurrentEpochID} = lookup_epoch(S),
if not is_tuple(EpochID) orelse tuple_size(EpochID) /= 2 ->
@ -238,26 +244,26 @@ do_pb_ll_request2(NSVersion, NS, EpochID, CMD, S) ->
{Epoch, _} = EpochID,
{CurrentEpoch, _} = CurrentEpochID,
if Epoch < CurrentEpoch ->
ok;
{{error, bad_epoch}, S};
true ->
%% We're at same epoch # but different checksum, or
%% we're at a newer/bigger epoch #.
_ = machi_flu1:wedge_myself(S#state.flu_name, CurrentEpochID),
ok
end,
{{error, bad_epoch}, S#state{epoch_id=CurrentEpochID}};
{{error, wedged}, S#state{epoch_id=CurrentEpochID}}
end;
true ->
do_pb_ll_request2b(CMD, S#state{epoch_id=CurrentEpochID})
#state{namespace_version=MyNSVersion, namespace=MyNS} = S,
if NSVersion /= MyNSVersion ->
{{error, bad_epoch}, S};
NS /= MyNS ->
{{error, bad_arg}, S};
true ->
do_pb_ll_request3(CMD, S)
end
end.
lookup_epoch(#state{epoch_tab=T}) ->
%% TODO: race in shutdown to access ets table after owner dies
ets:lookup_element(T, epoch, 2).
do_pb_ll_request2b(CMD, S) ->
io:format(user, "TODO: check NSVersion & NS\n", []),
do_pb_ll_request3(CMD, S).
%% Witness status does not matter below.
do_pb_ll_request3({low_echo, Msg}, S) ->
{Msg, S};
@ -463,14 +469,14 @@ do_server_list_files(#state{data_dir=DataDir}=_S) ->
{Size, File}
end || File <- Files]}.
do_server_wedge_status(S) ->
do_server_wedge_status(#state{namespace_version=NSVersion, namespace=NS}=S) ->
{Wedged_p, CurrentEpochID0} = lookup_epoch(S),
CurrentEpochID = if CurrentEpochID0 == undefined ->
?DUMMY_PV1_EPOCH;
true ->
CurrentEpochID0
end,
{Wedged_p, CurrentEpochID}.
{Wedged_p, CurrentEpochID, NSVersion, NS}.
do_server_delete_migration(File, #state{data_dir=DataDir}=_S) ->
case sanitize_file_string(File) of

View file

@ -54,12 +54,13 @@ from_pb_request(#mpb_ll_request{
req_id=ReqID,
append_chunk=IR=#mpb_ll_appendchunkreq{
namespace_version=NSVersion,
namespace=NS,
namespace=NS_str,
locator=NSLocator,
epoch_id=PB_EpochID,
prefix=Prefix,
chunk=Chunk,
csum=#mpb_chunkcsum{type=CSum_type, csum=CSum}}}) ->
NS = list_to_binary(NS_str),
EpochID = conv_to_epoch_id(PB_EpochID),
CSum_tag = conv_to_csum_tag(CSum_type),
Opts = conv_to_append_opts(IR),
@ -71,12 +72,13 @@ from_pb_request(#mpb_ll_request{
req_id=ReqID,
write_chunk=#mpb_ll_writechunkreq{
namespace_version=NSVersion,
namespace=NS,
namespace=NS_str,
epoch_id=PB_EpochID,
chunk=#mpb_chunk{file_name=File,
offset=Offset,
chunk=Chunk,
csum=#mpb_chunkcsum{type=CSum_type, csum=CSum}}}}) ->
NS = list_to_binary(NS_str),
EpochID = conv_to_epoch_id(PB_EpochID),
CSum_tag = conv_to_csum_tag(CSum_type),
{ReqID, {low_write_chunk, NSVersion, NS, EpochID, File, Offset, Chunk, CSum_tag, CSum}};
@ -84,12 +86,13 @@ from_pb_request(#mpb_ll_request{
req_id=ReqID,
read_chunk=#mpb_ll_readchunkreq{
namespace_version=NSVersion,
namespace=NS,
namespace=NS_str,
epoch_id=PB_EpochID,
chunk_pos=ChunkPos,
flag_no_checksum=PB_GetNoChecksum,
flag_no_chunk=PB_GetNoChunk,
flag_needs_trimmed=PB_NeedsTrimmed}}) ->
NS = list_to_binary(NS_str),
EpochID = conv_to_epoch_id(PB_EpochID),
Opts = #read_opts{no_checksum=PB_GetNoChecksum,
no_chunk=PB_GetNoChunk,
@ -102,12 +105,13 @@ from_pb_request(#mpb_ll_request{
req_id=ReqID,
trim_chunk=#mpb_ll_trimchunkreq{
namespace_version=NSVersion,
namespace=NS,
namespace=NS_str,
epoch_id=PB_EpochID,
file=File,
offset=Offset,
size=Size,
trigger_gc=TriggerGC}}) ->
NS = list_to_binary(NS_str),
EpochID = conv_to_epoch_id(PB_EpochID),
{ReqID, {low_trim_chunk, NSVersion, NS, EpochID, File, Offset, Size, TriggerGC}};
from_pb_request(#mpb_ll_request{
@ -179,10 +183,11 @@ from_pb_request(#mpb_request{req_id=ReqID,
{ReqID, {high_auth, User, Pass}};
from_pb_request(#mpb_request{req_id=ReqID,
append_chunk=IR=#mpb_appendchunkreq{}}) ->
#mpb_appendchunkreq{namespace=NS,
#mpb_appendchunkreq{namespace=NS_str,
prefix=Prefix,
chunk=Chunk,
csum=CSum} = IR,
NS = list_to_binary(NS_str),
TaggedCSum = make_tagged_csum(CSum, Chunk),
Opts = conv_to_append_opts(IR),
{ReqID, {high_append_chunk, NS, Prefix, Chunk, TaggedCSum, Opts}};
@ -310,9 +315,16 @@ from_pb_response(#mpb_ll_response{
from_pb_response(#mpb_ll_response{
req_id=ReqID,
wedge_status=#mpb_ll_wedgestatusresp{
epoch_id=PB_EpochID, wedged_flag=Wedged_p}}) ->
status=Status,
epoch_id=PB_EpochID, wedged_flag=Wedged_p,
namespace_version=NSVersion, namespace=NS_str}}) ->
GeneralStatus = case machi_pb_high_client:convert_general_status_code(Status) of
ok -> ok;
_Else -> {yukky, _Else}
end,
EpochID = conv_to_epoch_id(PB_EpochID),
{ReqID, {ok, {Wedged_p, EpochID}}};
NS = list_to_binary(NS_str),
{ReqID, {GeneralStatus, {Wedged_p, EpochID, NSVersion, NS}}};
from_pb_response(#mpb_ll_response{
req_id=ReqID,
delete_migration=#mpb_ll_deletemigrationresp{
@ -511,7 +523,7 @@ to_pb_response(ReqID, {low_skip_wedge, {low_echo, _Msg}}, Resp) ->
#mpb_ll_response{
req_id=ReqID,
echo=#mpb_echoresp{message=Resp}};
to_pb_response(ReqID, {low_skip_wedige, {low_auth, _, _}}, __TODO_Resp) ->
to_pb_response(ReqID, {low_skip_wedge, {low_auth, _, _}}, __TODO_Resp) ->
#mpb_ll_response{req_id=ReqID,
generic=#mpb_errorresp{code=1,
msg="AUTH not implemented"}};
@ -608,13 +620,16 @@ to_pb_response(ReqID, {low_skip_wedge, {low_wedge_status}}, Resp) ->
Status = conv_from_status(Error),
#mpb_ll_response{req_id=ReqID,
wedge_status=#mpb_ll_wedgestatusresp{status=Status}};
{Wedged_p, EpochID} ->
{Wedged_p, EpochID, NSVersion, NS} ->
PB_EpochID = conv_from_epoch_id(EpochID),
#mpb_ll_response{req_id=ReqID,
wedge_status=#mpb_ll_wedgestatusresp{
status='OK',
epoch_id=PB_EpochID,
wedged_flag=Wedged_p}}
wedged_flag=Wedged_p,
namespace_version=NSVersion,
namespace=NS
}}
end;
to_pb_response(ReqID, {low_skip_wedge, {low_delete_migration, _EID, _Fl}}, Resp)->
Status = conv_from_status(Resp),
@ -807,12 +822,12 @@ make_tagged_csum(#mpb_chunkcsum{type='CSUM_TAG_CLIENT_SHA', csum=CSum}, _CB) ->
make_ll_error_resp(ReqID, Code, Msg) ->
#mpb_ll_response{req_id=ReqID,
generic=#mpb_errorresp{code=Code,
msg=Msg}}.
msg=Msg}}.
make_error_resp(ReqID, Code, Msg) ->
#mpb_response{req_id=ReqID,
generic=#mpb_errorresp{code=Code,
msg=Msg}}.
msg=Msg}}.
conv_from_epoch_id({Epoch, EpochCSum}) ->
#mpb_epochid{epoch_number=Epoch,
@ -972,18 +987,6 @@ conv_from_status(_OOPS) ->
io:format(user, "HEY, ~s:~w got ~p\n", [?MODULE, ?LINE, _OOPS]),
'BAD_JOSS'.
conv_to_boolean(undefined) ->
false;
conv_to_boolean(0) ->
false;
conv_to_boolean(N) when is_integer(N) ->
true.
conv_from_boolean(false) ->
0;
conv_from_boolean(true) ->
1.
conv_from_append_opts(#append_opts{chunk_extra=ChunkExtra,
preferred_file_name=Pref,
flag_fail_preferred=FailPref}) ->

View file

@ -401,7 +401,7 @@ nonunanimous_setup_and_fix_test2() ->
Mb, ChainName, TheEpoch_3, ap_mode, MembersDict4, []),
Advance(),
{ok, {true, _}} = ?FLU_PC:wedge_status(Proxy_a),
{ok, {true, _,_,_}} = ?FLU_PC:wedge_status(Proxy_a),
{_, _, TheEpoch_4} = ?MGR:trigger_react_to_env(Mb),
{_, _, TheEpoch_4} = ?MGR:trigger_react_to_env(Mc),
[{ok, #projection_v1{upi=[b,c], repairing=[]}} =
@ -451,9 +451,9 @@ nonunanimous_setup_and_fix_test2() ->
#p_srvr{name=NameA} = hd(Ps),
{ok,_}=machi_flu_psup:start_flu_package(NameA, TcpPort+1, hd(Dirs), Opts),
Advance(),
{ok, {true, _}} = ?FLU_PC:wedge_status(Proxy_a),
{ok, {false, EpochID_8}} = ?FLU_PC:wedge_status(Proxy_b),
{ok, {false, EpochID_8}} = ?FLU_PC:wedge_status(Proxy_c),
{ok, {true, _,_,_}} = ?FLU_PC:wedge_status(Proxy_a),
{ok, {false, EpochID_8,_,_}} = ?FLU_PC:wedge_status(Proxy_b),
{ok, {false, EpochID_8,_,_}} = ?FLU_PC:wedge_status(Proxy_c),
[{ok, #projection_v1{upi=[b,c], repairing=[]}} =
?FLU_PC:read_latest_projection(Pxy, private) || Pxy <- tl(Proxies)],
@ -463,8 +463,8 @@ nonunanimous_setup_and_fix_test2() ->
ok = machi_flu_psup:stop_flu_package(a),
Advance(),
machi_flu1_test:clean_up_data_dir(hd(Dirs)),
{ok, {false, _}} = ?FLU_PC:wedge_status(Proxy_b),
{ok, {false, _}} = ?FLU_PC:wedge_status(Proxy_c),
{ok, {false, _,_,_}} = ?FLU_PC:wedge_status(Proxy_b),
{ok, {false, _,_,_}} = ?FLU_PC:wedge_status(Proxy_c),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
io:format("STEP: Add a to the chain again (a is stopped).\n", []),
@ -482,9 +482,9 @@ nonunanimous_setup_and_fix_test2() ->
{ok,_}=machi_flu_psup:start_flu_package(NameA, TcpPort+1, hd(Dirs), Opts),
Advance(),
{ok, {false, {TheEpoch10,_}}} = ?FLU_PC:wedge_status(Proxy_a),
{ok, {false, {TheEpoch10,_}}} = ?FLU_PC:wedge_status(Proxy_b),
{ok, {false, {TheEpoch10,_}}} = ?FLU_PC:wedge_status(Proxy_c),
{ok, {false, {TheEpoch10,_},_,_}} = ?FLU_PC:wedge_status(Proxy_a),
{ok, {false, {TheEpoch10,_},_,_}} = ?FLU_PC:wedge_status(Proxy_b),
{ok, {false, {TheEpoch10,_},_,_}} = ?FLU_PC:wedge_status(Proxy_c),
[{ok, #projection_v1{upi=[b,c], repairing=[a]}} =
?FLU_PC:read_latest_projection(Pxy, private) || Pxy <- Proxies],
ok

View file

@ -259,15 +259,15 @@ witness_smoke_test2() ->
%% Let's wedge OurWitness and see what happens: timeout/partition.
#p_srvr{name=WitName, address=WitA, port=WitP} =
orddict:fetch(OurWitness, D),
{ok, {false, EpochID2}} = machi_flu1_client:wedge_status(WitA, WitP),
{ok, {false, EpochID2,_,_}} = machi_flu1_client:wedge_status(WitA, WitP),
machi_flu1:wedge_myself(WitName, EpochID2),
case machi_flu1_client:wedge_status(WitA, WitP) of
{ok, {true, EpochID2}} ->
{ok, {true, EpochID2,_,_}} ->
ok;
{ok, {false, EpochID2}} ->
{ok, {false, EpochID2,_,_}} ->
%% This is racy. Work around it by sleeping a while.
timer:sleep(6*1000),
{ok, {true, EpochID2}} =
{ok, {true, EpochID2,_,_}} =
machi_flu1_client:wedge_status(WitA, WitP)
end,

View file

@ -104,7 +104,7 @@ flu_smoke_test() ->
{error, bad_arg} = ?FLU_C:checksum_list(Host, TcpPort, BadFile),
{ok, []} = ?FLU_C:list_files(Host, TcpPort, ?DUMMY_PV1_EPOCH),
{ok, {false, _}} = ?FLU_C:wedge_status(Host, TcpPort),
{ok, {false, _,_,_}} = ?FLU_C:wedge_status(Host, TcpPort),
Chunk1 = <<"yo!">>,
{ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort, NSInfo,
@ -173,6 +173,28 @@ flu_smoke_test() ->
NSInfo, ?DUMMY_PV1_EPOCH,
BadFile, Off2, Len2, noopt),
%% Make a connected socket.
Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}),
%% Let's test some cluster version enforcement.
Good_EpochNum = 0,
Good_NSVersion = 0,
Good_NS = <<>>,
{ok, {false, {Good_EpochNum,_}, Good_NSVersion, GoodNS}} =
?FLU_C:wedge_status(Sock1),
NS_good = #ns_info{version=Good_NSVersion, name=Good_NS},
{ok, {[{_, Off2, Chunk2, _}], _}} =
?FLU_C:read_chunk(Sock1, NS_good, ?DUMMY_PV1_EPOCH,
File2, Off2, Len2, noopt),
NS_bad_version = #ns_info{version=1, name=Good_NS},
NS_bad_name = #ns_info{version=Good_NSVersion, name= <<"foons">>},
{error, bad_epoch} =
?FLU_C:read_chunk(Sock1, NS_bad_version, ?DUMMY_PV1_EPOCH,
File2, Off2, Len2, noopt),
{error, bad_arg} =
?FLU_C:read_chunk(Sock1, NS_bad_name, ?DUMMY_PV1_EPOCH,
File2, Off2, Len2, noopt),
%% We know that File1 still exists. Pretend that we've done a
%% migration and exercise the delete_migration() API.
ok = ?FLU_C:delete_migration(Host, TcpPort, ?DUMMY_PV1_EPOCH, File1),
@ -188,8 +210,7 @@ flu_smoke_test() ->
{error, bad_arg} = ?FLU_C:trunc_hack(Host, TcpPort,
?DUMMY_PV1_EPOCH, BadFile),
ok = ?FLU_C:quit(?FLU_C:connect(#p_srvr{address=Host,
port=TcpPort}))
ok = ?FLU_C:quit(Sock1)
after
machi_test_util:stop_flu_package()
end.
@ -202,7 +223,7 @@ flu_projection_smoke_test() ->
try
[ok = flu_projection_common(Host, TcpPort, T) ||
T <- [public, private] ]
%% , {ok, {false, EpochID1}} = ?FLU_C:wedge_status(Host, TcpPort),
%% , {ok, {false, EpochID1,_,_}} = ?FLU_C:wedge_status(Host, TcpPort),
%% io:format(user, "EpochID1 ~p\n", [EpochID1])
after
machi_test_util:stop_flu_package()
@ -278,7 +299,7 @@ witness_test() ->
File, 9999, 9999, noopt),
{error, bad_arg} = ?FLU_C:checksum_list(Host, TcpPort, File),
{error, bad_arg} = ?FLU_C:list_files(Host, TcpPort, EpochID1),
{ok, {false, EpochID1}} = ?FLU_C:wedge_status(Host, TcpPort),
{ok, {false, EpochID1,_,_}} = ?FLU_C:wedge_status(Host, TcpPort),
{ok, _} = ?FLU_C:get_latest_epochid(Host, TcpPort, public),
{ok, _} = ?FLU_C:read_latest_projection(Host, TcpPort, public),
{error, not_written} = ?FLU_C:read_projection(Host, TcpPort,

View file

@ -94,13 +94,13 @@ partial_stop_restart2() ->
end,
try
[Start(P) || P <- Ps],
[{ok, {true, _}} = WedgeStatus(P) || P <- Ps], % all are wedged
[{ok, {true, _,_,_}} = WedgeStatus(P) || P <- Ps], % all are wedged
[{error,wedged} = Append(P, ?DUMMY_PV1_EPOCH) || P <- Ps], % all are wedged
[machi_chain_manager1:set_chain_members(ChMgr, Dict) ||
ChMgr <- ChMgrs ],
{ok, {false, EpochID1}} = WedgeStatus(hd(Ps)),
[{ok, {false, EpochID1}} = WedgeStatus(P) || P <- Ps], % *not* wedged
{ok, {false, EpochID1,_,_}} = WedgeStatus(hd(Ps)),
[{ok, {false, EpochID1,_,_}} = WedgeStatus(P) || P <- Ps], % *not* wedged
[{ok,_} = Append(P, EpochID1) || P <- Ps], % *not* wedged
{ok, {_,_,File1}} = Append(hd(Ps), EpochID1),
@ -126,9 +126,9 @@ partial_stop_restart2() ->
Epoch_m = Proj_m#projection_v1.epoch_number,
%% Confirm that all FLUs are *not* wedged, with correct proj & epoch
Proj_mCSum = Proj_m#projection_v1.epoch_csum,
[{ok, {false, {Epoch_m, Proj_mCSum}}} = WedgeStatus(P) || % *not* wedged
[{ok, {false, {Epoch_m, Proj_mCSum},_,_}} = WedgeStatus(P) || % *not* wedged
P <- Ps],
{ok, {false, EpochID1}} = WedgeStatus(hd(Ps)),
{ok, {false, EpochID1,_,_}} = WedgeStatus(hd(Ps)),
[{ok,_} = Append(P, EpochID1) || P <- Ps], % *not* wedged
%% Stop all but 'a'.
@ -160,7 +160,7 @@ partial_stop_restart2() ->
{now_using,_,Epoch_n} = machi_chain_manager1:trigger_react_to_env(
hd(ChMgrs)),
true = (Epoch_n > Epoch_m),
{ok, {false, EpochID3}} = WedgeStatus(hd(Ps)),
{ok, {false, EpochID3,_,_}} = WedgeStatus(hd(Ps)),
%% The file we're assigned should be different with the epoch change.
{ok, {_,_,File3}} = Append(hd(Ps), EpochID3),
true = (File1 /= File3),

View file

@ -89,7 +89,7 @@ io:format(user, "\nTODO: fix write_chunk() call below @ ~s LINE ~w\n", [?MODULE,
BadFile = <<"no-such-file">>,
{error, bad_arg} = ?MUT:checksum_list(Prox1, BadFile),
{ok, [_|_]} = ?MUT:list_files(Prox1, FakeEpoch),
{ok, {false, _}} = ?MUT:wedge_status(Prox1),
{ok, {false, _,_,_}} = ?MUT:wedge_status(Prox1),
{ok, {0, _SomeCSum}} = ?MUT:get_latest_epochid(Prox1, public),
{ok, #projection_v1{epoch_number=0}} =
?MUT:read_latest_projection(Prox1, public),