From ae1d038abe00d2958098a3b0833de72c0e84de69 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Fri, 8 May 2015 13:40:44 +0900 Subject: [PATCH 01/12] Change default value of chmgr's use_partition_simulator to false --- src/machi_chain_manager1.erl | 2 +- test/machi_chain_manager1_converge_demo.erl | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index a983ea7..e784b00 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -171,7 +171,7 @@ init({MyName, InitMembersDict, MgrOpts}) -> All_list = [P#p_srvr.name || {_, P} <- orddict:to_list(MembersDict)], Opt = fun(Key, Default) -> proplists:get_value(Key, MgrOpts, Default) end, RunEnv = [{seed, Opt(seed, now())}, - {use_partition_simulator, Opt(use_partition_simulator, true)}, + {use_partition_simulator, Opt(use_partition_simulator, false)}, {network_partitions, Opt(network_partitions, [])}, {network_islands, Opt(network_islands, [])}, {flapping_i, Opt(flapping, [])}, diff --git a/test/machi_chain_manager1_converge_demo.erl b/test/machi_chain_manager1_converge_demo.erl index 30e7b23..af14870 100644 --- a/test/machi_chain_manager1_converge_demo.erl +++ b/test/machi_chain_manager1_converge_demo.erl @@ -167,7 +167,8 @@ convergence_demo_testfun(NumFLUs) -> {Name, PPid} end || {#p_srvr{name=Name}=P, _Dir} <- PsDirs], MembersDict = machi_projection:make_members_dict(Ps), - MgrOpts = [private_write_verbose, {active_mode,false}], + MgrOpts = [private_write_verbose, {active_mode,false}, + {use_partition_simulator, true}], MgrNamez = [begin {ok, MPid} = ?MGR:start_link(P#p_srvr.name, MembersDict, MgrOpts), From 762aef557f62f5ca095523d6953182f8dbb61913 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Fri, 8 May 2015 15:36:53 +0900 Subject: [PATCH 02/12] WIP: Set the stage for FLU wedging API --- TODO-shortterm.org | 2 +- src/machi_chain_manager1.erl | 19 +++++++----- test/machi_flu_psup_test.erl | 59 ++++++++++++++++++++++++------------ 3 files changed, 53 insertions(+), 27 deletions(-) diff --git a/TODO-shortterm.org b/TODO-shortterm.org index 9dbd35f..cc81b8b 100644 --- a/TODO-shortterm.org +++ b/TODO-shortterm.org @@ -30,9 +30,9 @@ func, and pattern match Erlang style in that func. ** DONE Move prototype/chain-manager code to "top" of source tree *** DONE Preserve current test code (leave as-is? tiny changes?) *** DONE Make chain manager code flexible enough to run "real world" or "sim" +** TODO Add projection wedging logic to each FLU. ** TODO Implement real data repair, orchestrated by the chain manager ** TODO Change all protocol ops to enforce the epoch ID -** TODO Add projection wedging logic to each FLU. - Add no-wedging state to make testing easier? diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index e784b00..57de755 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -523,14 +523,19 @@ calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj, Down = AllMembers -- Up, NewUPI_list = [X || X <- OldUPI_list, lists:member(X, Up)], + LastInNewUPI = case NewUPI_list of + [] -> does_not_exist_because_upi_is_empty; + [_|_] -> lists:last(NewUPI_list) + end, Repairing_list2 = [X || X <- OldRepairing_list, lists:member(X, Up)], + Simulator_p = proplists:get_value(use_partition_simulator, RunEnv2, false), {NewUPI_list3, Repairing_list3, RunEnv3} = case {NewUp, Repairing_list2} of {[], []} -> D_foo=[], {NewUPI_list, [], RunEnv2}; - {[], [H|T]} when RelativeToServer == hd(NewUPI_list) -> - %% The author is head of the UPI list. Let's see if + {[], [H|T]} when RelativeToServer == LastInNewUPI -> + %% The author is tail of the UPI list. Let's see if %% *everyone* in the UPI+repairing lists are using our %% projection. This is to simulate a requirement that repair %% a real repair process cannot take place until the chain is @@ -540,12 +545,12 @@ calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj, SameEpoch_p = check_latest_private_projections_same_epoch( tl(NewUPI_list) ++ Repairing_list2, S#ch_mgr.proj, Partitions, S), - if not SameEpoch_p -> - D_foo=[], - {NewUPI_list, OldRepairing_list, RunEnv2}; - true -> + if Simulator_p andalso SameEpoch_p -> D_foo=[{repair_airquote_done, {we_agree, (S#ch_mgr.proj)#projection_v1.epoch_number}}], - {NewUPI_list ++ [H], T, RunEnv2} + {NewUPI_list ++ [H], T, RunEnv2}; + true -> + D_foo=[], + {NewUPI_list, OldRepairing_list, RunEnv2} end; {_, _} -> D_foo=[], diff --git a/test/machi_flu_psup_test.erl b/test/machi_flu_psup_test.erl index 59ea017..9b76866 100644 --- a/test/machi_flu_psup_test.erl +++ b/test/machi_flu_psup_test.erl @@ -61,47 +61,68 @@ smoke_test2() -> ok end. -smoke2_test_() -> - {timeout, 5*60, fun() -> smoke2_test2() end}. +partial_stop_restart_test_() -> + {timeout, 5*60, fun() -> partial_stop_restart2() end}. -smoke2_test2() -> +partial_stop_restart2() -> Ps = [{a,#p_srvr{name=a, address="localhost", port=5555, props="./data.a"}}, {b,#p_srvr{name=b, address="localhost", port=5556, props="./data.b"}}, {c,#p_srvr{name=c, address="localhost", port=5557, props="./data.c"}} ], + ChMgrs = [machi_flu_psup:make_mgr_supname(P#p_srvr.name) || {_,P} <-Ps], + PStores = [machi_flu_psup:make_proj_supname(P#p_srvr.name) || {_,P} <-Ps], + Dict = orddict:from_list(Ps), [os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps], {ok, SupPid} = machi_flu_sup:start_link(), try - [begin - #p_srvr{name=Name, port=Port, props=Dir} = P, - {ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, - [{active_mode,false}]) - end || {_,P} <- Ps], + Start = fun({_,P}) -> + #p_srvr{name=Name, port=Port, props=Dir} = P, + {ok, _} = machi_flu_psup:start_flu_package( + Name, Port, Dir, [{active_mode,false}]) + end, + [Start(P) || P <- Ps], + %% TODO: Confirm that all FLUs are wedged - ChMgrs = [machi_flu_psup:make_mgr_supname(P#p_srvr.name) || {_,P} <-Ps], - PStores = [machi_flu_psup:make_proj_supname(P#p_srvr.name) || {_,P} <-Ps], - Dict = orddict:from_list(Ps), [machi_chain_manager1:set_chain_members(ChMgr, Dict) || ChMgr <- ChMgrs ], + %% TODO: Confirm that all FLUs are wedged - {now_using,_,_} = machi_chain_manager1:test_react_to_env(hd(ChMgrs)), + {_,_,_} = machi_chain_manager1:test_react_to_env(hd(ChMgrs)), [begin _QQa = machi_chain_manager1:test_react_to_env(ChMgr) end || _ <- lists:seq(1,25), ChMgr <- ChMgrs], - %% All chain maanagers & projection stores should be using the + %% All chain managers & projection stores should be using the %% same projection which is max projection in each store. - {no_change,_,Epoch_z} = machi_chain_manager1:test_react_to_env( - hd(ChMgrs)), - [{no_change,_,Epoch_z} = machi_chain_manager1:test_react_to_env( + {no_change,_,Epoch_m} = machi_chain_manager1:test_react_to_env( + hd(ChMgrs)), + [{no_change,_,Epoch_m} = machi_chain_manager1:test_react_to_env( ChMgr )|| ChMgr <- ChMgrs], - {ok, Proj_z} = machi_projection_store:read_latest_projection( + {ok, Proj_m} = machi_projection_store:read_latest_projection( hd(PStores), public), [begin - {ok, Proj_z} = machi_projection_store:read_latest_projection( + {ok, Proj_m} = machi_projection_store:read_latest_projection( PStore, ProjType) end || ProjType <- [public, private], PStore <- PStores ], - Epoch_z = Proj_z#projection_v1.epoch_number, + Epoch_m = Proj_m#projection_v1.epoch_number, + %% TODO: Confirm that all FLUs are *not* wedged + + %% Stop all but 'a'. + [ok = machi_flu_psup:stop_flu_package(Name) || {Name,_} <- tl(Ps)], + + %% Stop and restart a. + {FluName_a, _} = hd(Ps), + ok = machi_flu_psup:stop_flu_package(FluName_a), + {ok, _} = Start(hd(Ps)), + %% Remember: 'a' is not in active mode. + {ok, Proj_m} = machi_projection_store:read_latest_projection( + hd(PStores), private), + %% TODO: confirm that 'a' is wedged + {now_using,_,Epoch_n} = machi_chain_manager1:test_react_to_env( + hd(ChMgrs)), + true = (Epoch_n > Epoch_m), + %% TODO: confirm that 'b' is wedged + ok after exit(SupPid, normal), From 1dc759b908323d0dd1aa56821e37a4fe6894dda9 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Fri, 8 May 2015 16:53:10 +0900 Subject: [PATCH 03/12] WIP: add wedge_status() query to client --- src/machi_flu1.erl | 42 ++++++++++++++++++++++++---- src/machi_flu1_client.erl | 44 ++++++++++++++++++++++++++++++ test/machi_chain_manager1_test.erl | 5 ++++ test/machi_flu1_test.erl | 1 + 4 files changed, 87 insertions(+), 5 deletions(-) diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 7b06f62..4265718 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -75,8 +75,8 @@ append_pid :: pid(), tcp_port :: non_neg_integer(), data_dir :: string(), - wedge = true :: 'disabled' | boolean(), - my_epoch_id :: 'undefined', + wedge = true :: boolean(), + epoch_id :: 'undefined' | machi_projection:pv_epoch(), dbg_props = [] :: list(), % proplist props = [] :: list() % proplist }). @@ -100,6 +100,8 @@ main2(FluName, TcpPort, DataDir, Rest) -> S0 = #state{flu_name=FluName, tcp_port=TcpPort, data_dir=DataDir, + wedge=true, + epoch_id=undefined, props=Rest}, AppendPid = start_append_server(S0), {_ProjRegName, ProjectionPid} = @@ -118,8 +120,7 @@ main2(FluName, TcpPort, DataDir, Rest) -> undefined -> S1; DbgProps -> - S1#state{wedge=disabled, - dbg_props=DbgProps, + S1#state{dbg_props=DbgProps, props=lists:keydelete(dbg, 1, Rest)} end, ListenPid = start_listen_server(S2), @@ -176,7 +177,15 @@ append_server_loop(FluPid, #state{data_dir=DataDir}=S) -> %% DataDir, FluPid) end), append_server_loop(FluPid, S); {wedge_state_change, Boolean} -> - append_server_loop(FluPid, S#state{wedge=Boolean}) + %% append_server_loop(FluPid, S#state{wedge=Boolean}) + append_server_loop(FluPid, S); + {wedge_status, FromPid} -> + #state{wedge=Wedge_p, epoch_id=EpochId} = S, + FromPid ! {wedge_status_reply, Wedge_p, EpochId}, + append_server_loop(FluPid, S); + Else -> + io:format(user, "append_server_loop: WHA? ~p\n", [Else]), + append_server_loop(FluPid, S) end. -define(EpochIDSpace, (4+20)). @@ -233,6 +242,8 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> do_net_server_truncate_hackityhack(Sock, File, DataDir); <<"PROJ ", LenHex:8/binary, "\n">> -> do_projection_command(Sock, LenHex, S); + <<"WEDGE-STATUS\n">> -> + do_wedge_status(FluName, Sock); _ -> machi_util:verb("Else Got: ~p\n", [Line]), gen_tcp:send(Sock, "ERROR SYNTAX\n"), @@ -286,6 +297,27 @@ do_net_server_append2(FluName, Sock, LenHex, Prefix) -> ok = gen_tcp:send(Sock, "TIMEOUT\n") end. +do_wedge_status(FluName, Sock) -> + FluName ! {wedge_status, self()}, + Reply = receive + {wedge_status_reply, Bool, EpochId} -> + BoolHex = if Bool == false -> <<"00">>; + Bool == true -> <<"01">> + end, + case EpochId of + undefined -> + EpochHex = machi_util:int_to_hexstr(0, 32), + CSumHex = machi_util:bin_to_hexstr(<<0:(20*8)/big>>); + {Epoch, EpochCSum} -> + EpochHex = machi_util:int_to_hexstr(Epoch, 32), + CSumHex = machi_util:bin_to_hexstr(EpochCSum) + end, + [<<"OK ">>, BoolHex, 32, EpochHex, 32, CSumHex, 10] + after 30*1000 -> + <<"give_it_up\n">> + end, + ok = gen_tcp:send(Sock, Reply). + do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir) -> DoItFun = fun(FH, Offset, Len) -> case file:pread(FH, Offset, Len) of diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index d2dac02..be0dc54 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -31,6 +31,7 @@ read_chunk/5, read_chunk/6, checksum_list/3, checksum_list/4, list_files/2, list_files/3, + wedge_status/1, wedge_status/2, %% Projection API get_latest_epoch/2, get_latest_epoch/3, @@ -153,6 +154,27 @@ list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) -> catch gen_tcp:close(Sock) end. +%% @doc Fetch the wedge status from the remote FLU. + +-spec wedge_status(port()) -> + {ok, {boolean(), machi_projection:pv1_epoch()}} | {error, term()}. + +wedge_status(Sock) when is_port(Sock) -> + wedge_status2(Sock). + +%% @doc Fetch the wedge status from the remote FLU. + +-spec wedge_status(inet_host(), inet_port()) -> + {ok, {boolean(), machi_projection:pv1_epoch()}} | {error, term()}. +wedge_status(Host, TcpPort) when is_integer(TcpPort) -> + Sock = machi_util:connect(Host, TcpPort), + try + wedge_status2(Sock) + after + catch gen_tcp:close(Sock) + end. + + %% @doc Get the latest epoch number + checksum from the FLU's projection store. -spec get_latest_epoch(port(), projection_type()) -> @@ -476,6 +498,28 @@ list3({ok, Line}, Sock) -> list3(Else, _Sock) -> throw({server_protocol_error, Else}). +wedge_status2(Sock) -> + try + ok = gen_tcp:send(Sock, [<<"WEDGE-STATUS\n">>]), + ok = inet:setopts(Sock, [{packet, line}]), + {ok, <<"OK ", + BooleanHex:2/binary, " ", + EpochHex:8/binary, " ", + CSumHex:40/binary, "\n">>} = gen_tcp:recv(Sock, 0), + ok = inet:setopts(Sock, [{packet, raw}]), + Boolean = if BooleanHex == <<"00">> -> false; + BooleanHex == <<"01">> -> true + end, + Res = {Boolean, {machi_util:hexstr_to_int(EpochHex), + machi_util:hexstr_to_bin(CSumHex)}}, + {ok, Res} + catch + throw:Error -> + Error; + error:{badmatch,_}=BadMatch -> + {error, {badmatch, BadMatch}} + end. + checksum_list2(Sock, EpochID, File) -> erase(bad_sock), try diff --git a/test/machi_chain_manager1_test.erl b/test/machi_chain_manager1_test.erl index 89f586a..c51559d 100644 --- a/test/machi_chain_manager1_test.erl +++ b/test/machi_chain_manager1_test.erl @@ -170,6 +170,11 @@ smoke1_test() -> {ok, M0} = ?MGR:start_link(a, MembersDict, [{active_mode,false}]), try {ok, P1} = ?MGR:test_calc_projection(M0, false), + % DERP! Check for race with manager's proxy vs. proj listener + case ?MGR:test_read_latest_public_projection(M0, false) of + {error, partition} -> timer:sleep(500); + _ -> ok + end, {local_write_result, ok, {remote_write_results, [{b,ok},{c,ok}]}} = ?MGR:test_write_public_projection(M0, P1), diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index b4580e2..a521bd0 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -64,6 +64,7 @@ flu_smoke_test() -> ?DUMMY_PV1_EPOCH, BadFile), {ok, []} = ?FLU_C:list_files(Host, TcpPort, ?DUMMY_PV1_EPOCH), + {ok, {true, {0,<<0:(20*8)/big>>}}} = ?FLU_C:wedge_status(Host, TcpPort), Chunk1 = <<"yo!">>, {ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort, From d6d003618d54a1fab5f9226cab3294df62776427 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Fri, 8 May 2015 16:58:06 +0900 Subject: [PATCH 04/12] WIP: add wedge_status() query to proxy client --- src/machi_flu1_client.erl | 1 - src/machi_proxy_flu1_client.erl | 14 ++++++++++++++ test/machi_proxy_flu1_client_test.erl | 1 + 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index be0dc54..8a1fc1e 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -174,7 +174,6 @@ wedge_status(Host, TcpPort) when is_integer(TcpPort) -> catch gen_tcp:close(Sock) end. - %% @doc Get the latest epoch number + checksum from the FLU's projection store. -spec get_latest_epoch(port(), projection_type()) -> diff --git a/src/machi_proxy_flu1_client.erl b/src/machi_proxy_flu1_client.erl index f69d573..3bbaa28 100644 --- a/src/machi_proxy_flu1_client.erl +++ b/src/machi_proxy_flu1_client.erl @@ -54,6 +54,7 @@ read_chunk/5, read_chunk/6, checksum_list/3, checksum_list/4, list_files/2, list_files/3, + wedge_status/1, wedge_status/2, %% %% Projection API get_latest_epoch/2, get_latest_epoch/3, @@ -131,6 +132,17 @@ list_files(PidSpec, EpochID, Timeout) -> gen_server:call(PidSpec, {req, {list_files, EpochID}}, Timeout). +%% @doc Fetch the wedge status from the remote FLU. + +wedge_status(PidSpec) -> + wedge_status(PidSpec, infinity). + +%% @doc Fetch the wedge status from the remote FLU. + +wedge_status(PidSpec, Timeout) -> + gen_server:call(PidSpec, {req, {wedge_status}}, + Timeout). + %% @doc Get the latest epoch number + checksum from the FLU's projection store. get_latest_epoch(PidSpec, ProjType) -> @@ -261,6 +273,8 @@ make_req_fun({checksum_list, EpochID, File}, #state{sock=Sock}) -> fun() -> ?FLU_C:checksum_list(Sock, EpochID, File) end; make_req_fun({list_files, EpochID}, #state{sock=Sock}) -> fun() -> ?FLU_C:list_files(Sock, EpochID) end; +make_req_fun({wedge_status}, #state{sock=Sock}) -> + fun() -> ?FLU_C:wedge_status(Sock) end; make_req_fun({get_latest_epoch, ProjType}, #state{sock=Sock}) -> fun() -> ?FLU_C:get_latest_epoch(Sock, ProjType) end; make_req_fun({read_latest_projection, ProjType}, #state{sock=Sock}) -> diff --git a/test/machi_proxy_flu1_client_test.erl b/test/machi_proxy_flu1_client_test.erl index 5de23e8..cc3a8f3 100644 --- a/test/machi_proxy_flu1_client_test.erl +++ b/test/machi_proxy_flu1_client_test.erl @@ -66,6 +66,7 @@ api_smoke_test() -> BadFile = <<"no-such-file">>, {error, no_such_file} = ?MUT:checksum_list(Prox1, FakeEpoch, BadFile), {ok, [_|_]} = ?MUT:list_files(Prox1, FakeEpoch), + {ok, {true, {0,<<0:(20*8)/big>>}}} = ?MUT:wedge_status(Prox1), {ok, FakeEpoch} = ?MUT:get_latest_epoch(Prox1, public), {error, not_written} = ?MUT:read_latest_projection(Prox1, public), {error, not_written} = ?MUT:read_projection(Prox1, public, 44), From ca854373f8026fc63da59811d80a059886e94f15 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Fri, 8 May 2015 17:07:03 +0900 Subject: [PATCH 05/12] WIP: new test fails: check for not wedged --- test/machi_flu_psup_test.erl | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/test/machi_flu_psup_test.erl b/test/machi_flu_psup_test.erl index 9b76866..9bc9610 100644 --- a/test/machi_flu_psup_test.erl +++ b/test/machi_flu_psup_test.erl @@ -74,18 +74,21 @@ partial_stop_restart2() -> Dict = orddict:from_list(Ps), [os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps], {ok, SupPid} = machi_flu_sup:start_link(), + Start = fun({_,P}) -> + #p_srvr{name=Name, port=Port, props=Dir} = P, + {ok, _} = machi_flu_psup:start_flu_package( + Name, Port, Dir, [{active_mode,false}]) + end, + WedgeStatus = fun({_,#p_srvr{address=Addr, port=TcpPort}}) -> + machi_flu1_client:wedge_status(Addr, TcpPort) + end, try - Start = fun({_,P}) -> - #p_srvr{name=Name, port=Port, props=Dir} = P, - {ok, _} = machi_flu_psup:start_flu_package( - Name, Port, Dir, [{active_mode,false}]) - end, [Start(P) || P <- Ps], - %% TODO: Confirm that all FLUs are wedged + [{ok, {true, _}} = WedgeStatus(P) || P <- Ps], % all are wedged [machi_chain_manager1:set_chain_members(ChMgr, Dict) || ChMgr <- ChMgrs ], - %% TODO: Confirm that all FLUs are wedged + [{ok, {true, _}} = WedgeStatus(P) || P <- Ps], % all are wedged {_,_,_} = machi_chain_manager1:test_react_to_env(hd(ChMgrs)), [begin @@ -106,6 +109,7 @@ partial_stop_restart2() -> end || ProjType <- [public, private], PStore <- PStores ], Epoch_m = Proj_m#projection_v1.epoch_number, %% TODO: Confirm that all FLUs are *not* wedged + [{ok, {false, _}} = WedgeStatus(P) || P <- Ps], % all are wedged %% Stop all but 'a'. [ok = machi_flu_psup:stop_flu_package(Name) || {Name,_} <- tl(Ps)], From 7906e6c23599fedcd2d96cd6171c91cd330cf637 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Fri, 8 May 2015 18:17:41 +0900 Subject: [PATCH 06/12] WIP: basic wedge notifications now working --- src/machi_chain_manager1.erl | 3 +- src/machi_flu1.erl | 22 +++++++----- src/machi_flu_psup.erl | 2 +- src/machi_projection_store.erl | 65 ++++++++++++++++++++++++++++------ test/machi_flu_psup_test.erl | 8 +++-- 5 files changed, 77 insertions(+), 23 deletions(-) diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index 57de755..12b885e 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -70,7 +70,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([make_chmgr_regname/1, projection_transitions_are_sane/2]). +-export([make_chmgr_regname/1, projection_transitions_are_sane/2, + inner_projection_exists/1, inner_projection_or_self/1]). -ifdef(TEST). diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 4265718..28cf404 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -66,7 +66,8 @@ -include("machi.hrl"). -include("machi_projection.hrl"). --export([start_link/1, stop/1]). +-export([start_link/1, stop/1, + update_wedge_state/3]). -export([make_listener_regname/1, make_projection_server_regname/1]). -record(state, { @@ -75,7 +76,7 @@ append_pid :: pid(), tcp_port :: non_neg_integer(), data_dir :: string(), - wedge = true :: boolean(), + wedged = true :: boolean(), epoch_id :: 'undefined' | machi_projection:pv_epoch(), dbg_props = [] :: list(), % proplist props = [] :: list() % proplist @@ -94,13 +95,17 @@ stop(Pid) -> error end. +update_wedge_state(PidSpec, Boolean, EpochId) + when (Boolean == true orelse Boolean == false), is_tuple(EpochId) -> + PidSpec ! {wedge_state_change, Boolean, EpochId}. + %%%%%%%%%%%%%%%%%%%%%%%%%%%% main2(FluName, TcpPort, DataDir, Rest) -> S0 = #state{flu_name=FluName, tcp_port=TcpPort, data_dir=DataDir, - wedge=true, + wedged=true, epoch_id=undefined, props=Rest}, AppendPid = start_append_server(S0), @@ -160,6 +165,7 @@ run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> listen_server_loop(LSock, S). run_append_server(FluPid, #state{flu_name=Name}=S) -> + %% Reminder: Name is the "main" name of the FLU, i.e., no suffix register(Name, self()), append_server_loop(FluPid, S). @@ -176,12 +182,12 @@ append_server_loop(FluPid, #state{data_dir=DataDir}=S) -> DataDir, AppendServerPid) end), %% DataDir, FluPid) end), append_server_loop(FluPid, S); - {wedge_state_change, Boolean} -> - %% append_server_loop(FluPid, S#state{wedge=Boolean}) - append_server_loop(FluPid, S); + {wedge_state_change, Boolean, EpochId} -> + append_server_loop(FluPid, S#state{wedged=Boolean, + epoch_id=EpochId}); {wedge_status, FromPid} -> - #state{wedge=Wedge_p, epoch_id=EpochId} = S, - FromPid ! {wedge_status_reply, Wedge_p, EpochId}, + #state{wedged=Wedged_p, epoch_id=EpochId} = S, + FromPid ! {wedge_status_reply, Wedged_p, EpochId}, append_server_loop(FluPid, S); Else -> io:format(user, "append_server_loop: WHA? ~p\n", [Else]), diff --git a/src/machi_flu_psup.erl b/src/machi_flu_psup.erl index 4405bf6..a4fe0ad 100644 --- a/src/machi_flu_psup.erl +++ b/src/machi_flu_psup.erl @@ -66,7 +66,7 @@ init([FluName, TcpPort, DataDir, Props0]) -> {use_partition_simulator,false}|Props0], ProjSpec = {ProjRegName, {machi_projection_store, start_link, - [ProjRegName, DataDir, zarfus_todo]}, + [ProjRegName, DataDir, FluName]}, permanent, 5000, worker, []}, MgrSpec = {make_mgr_supname(FluName), {machi_chain_manager1, start_link, diff --git a/src/machi_projection_store.erl b/src/machi_projection_store.erl index ffe8786..cc74c7c 100644 --- a/src/machi_projection_store.erl +++ b/src/machi_projection_store.erl @@ -60,7 +60,6 @@ -record(state, { public_dir = "" :: string(), private_dir = "" :: string(), - wedged = true :: boolean(), wedge_notify_pid :: pid() | atom(), max_public_epoch = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()}, max_private_epoch = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()} @@ -170,17 +169,16 @@ init([DataDir, NotifyWedgeStateChanges]) -> {ok, #state{public_dir=PublicDir, private_dir=PrivateDir, - wedged=true, wedge_notify_pid=NotifyWedgeStateChanges, max_public_epoch=MaxPublicEpoch, max_private_epoch=MaxPrivateEpoch}}. handle_call({{get_latest_epoch, ProjType}, LC1}, _From, S) -> LC2 = lclock_update(LC1), - EpochT = if ProjType == public -> S#state.max_public_epoch; - ProjType == private -> S#state.max_private_epoch + EpochId = if ProjType == public -> S#state.max_public_epoch; + ProjType == private -> S#state.max_private_epoch end, - {reply, {{ok, EpochT}, LC2}, S}; + {reply, {{ok, EpochId}, LC2}, S}; handle_call({{read_latest_projection, ProjType}, LC1}, _From, S) -> LC2 = lclock_update(LC1), {EpochNum, _CSum} = if ProjType == public -> S#state.max_public_epoch; @@ -258,15 +256,29 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) -> ok = file:write(FH, term_to_binary(Proj)), ok = file:sync(FH), ok = file:close(FH), - EpochT = {Epoch, Proj#projection_v1.epoch_csum}, + EffectiveProj = machi_chain_manager1:inner_projection_or_self(Proj), + EffectiveEpoch = EffectiveProj#projection_v1.epoch_number, + EpochId = {EffectiveEpoch, EffectiveProj#projection_v1.epoch_csum}, + %% NewS = if ProjType == public, Epoch > element(1, S#state.max_public_epoch) -> - %io:format(user, "TODO: tell ~p we are wedged by epoch ~p\n", [S#state.wedge_notify_pid, Epoch]), - S#state{max_public_epoch=EpochT, wedged=true}; + if Epoch == EffectiveEpoch -> + %% This is a regular projection, i.e., + %% does not have an inner proj. + update_wedge_state( + S#state.wedge_notify_pid, true, EpochId); + Epoch /= EffectiveEpoch -> + %% This projection has an inner proj. + %% The outer proj is flapping, so we do + %% not bother wedging. + ok + end, + S#state{max_public_epoch=EpochId}; ProjType == private, Epoch > element(1, S#state.max_private_epoch) -> - %io:format(user, "TODO: tell ~p we are unwedged by epoch ~p\n", [S#state.wedge_notify_pid, Epoch]), - S#state{max_private_epoch=EpochT, wedged=false}; + update_wedge_state( + S#state.wedge_notify_pid, false, EpochId), + S#state{max_private_epoch=EpochId}; true -> S end, @@ -275,6 +287,39 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) -> {{error, Else}, S} end. +update_wedge_state(PidSpec, Boolean, {0,_}=EpochId) -> + %% Epoch #0 is a special case: no projection has been written yet. + %% However, given the way that machi_flu_psup starts the + %% processes, we are roughly 100% certain that the FLU for PidSpec + %% is not yet running. + catch machi_flu1:update_wedge_state(PidSpec, Boolean, EpochId); +update_wedge_state(PidSpec, Boolean, EpochId) -> + %% We have a race problem with the startup order by machi_flu_psup: + %% the order is projection store (me!), projection manager, FLU. + %% PidSpec is the FLU. It's almost certainly a registered name. + %% Wait for it to exist before sending a message to it. Racing with + %% supervisor startup/shutdown/restart is ok. + ok = wait_for_liveness(PidSpec, 10*1000), + machi_flu1:update_wedge_state(PidSpec, Boolean, EpochId). + +wait_for_liveness(Pid, _WaitTime) when is_pid(Pid) -> + ok; +wait_for_liveness(PidSpec, WaitTime) -> + wait_for_liveness(PidSpec, os:timestamp(), WaitTime). + +wait_for_liveness(PidSpec, StartTime, WaitTime) -> + case whereis(PidSpec) of + undefined -> + case timer:now_diff(os:timestamp(), StartTime) div 1000 of + X when X < WaitTime -> + io:format(user, "\nYOO ~p ~p\n", [PidSpec, lists:sort(registered())]), + timer:sleep(1), + wait_for_liveness(PidSpec, StartTime, WaitTime) + end; + _SomePid -> + ok + end. + pick_path(public, S) -> S#state.public_dir; pick_path(private, S) -> diff --git a/test/machi_flu_psup_test.erl b/test/machi_flu_psup_test.erl index 9bc9610..7aba38d 100644 --- a/test/machi_flu_psup_test.erl +++ b/test/machi_flu_psup_test.erl @@ -88,7 +88,7 @@ partial_stop_restart2() -> [machi_chain_manager1:set_chain_members(ChMgr, Dict) || ChMgr <- ChMgrs ], - [{ok, {true, _}} = WedgeStatus(P) || P <- Ps], % all are wedged + [{ok, {false, _}} = WedgeStatus(P) || P <- Ps], % *not* wedged {_,_,_} = machi_chain_manager1:test_react_to_env(hd(ChMgrs)), [begin @@ -108,8 +108,10 @@ partial_stop_restart2() -> PStore, ProjType) end || ProjType <- [public, private], PStore <- PStores ], Epoch_m = Proj_m#projection_v1.epoch_number, - %% TODO: Confirm that all FLUs are *not* wedged - [{ok, {false, _}} = WedgeStatus(P) || P <- Ps], % all are wedged + %% Confirm that all FLUs are *not* wedged, with correct proj & epoch + Proj_mCSum = Proj_m#projection_v1.epoch_csum, + [{ok, {false, {Epoch_m, Proj_mCSum}}} = WedgeStatus(P) || % *not* wedged + P <- Ps], %% Stop all but 'a'. [ok = machi_flu_psup:stop_flu_package(Name) || {Name,_} <- tl(Ps)], From 376c4a9ae18a0a659fa954de7e2dc336572f6c97 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Fri, 8 May 2015 18:22:44 +0900 Subject: [PATCH 07/12] WIP: failing test for append while wedged --- test/machi_flu_psup_test.erl | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/machi_flu_psup_test.erl b/test/machi_flu_psup_test.erl index 7aba38d..5b289d5 100644 --- a/test/machi_flu_psup_test.erl +++ b/test/machi_flu_psup_test.erl @@ -82,9 +82,15 @@ partial_stop_restart2() -> WedgeStatus = fun({_,#p_srvr{address=Addr, port=TcpPort}}) -> machi_flu1_client:wedge_status(Addr, TcpPort) end, + Append = fun({_,#p_srvr{address=Addr, port=TcpPort}}) -> + machi_flu1_client:append_chunk(Addr, TcpPort, + ?DUMMY_PV1_EPOCH, + <<"prefix">>, <<"data">>) + end, try [Start(P) || P <- Ps], [{ok, {true, _}} = WedgeStatus(P) || P <- Ps], % all are wedged + [bummer = Append(P) || P <- Ps], % all are wedged [machi_chain_manager1:set_chain_members(ChMgr, Dict) || ChMgr <- ChMgrs ], From 316126fa59efdcdc354cb5c88d6550d305adda03 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Fri, 8 May 2015 19:07:57 +0900 Subject: [PATCH 08/12] WIP: additional tests for wedge state --- src/machi_flu1.erl | 11 ++++++--- src/machi_flu1_client.erl | 48 +++++++++++++++++++++++++----------- test/machi_flu_psup_test.erl | 11 ++++++--- 3 files changed, 50 insertions(+), 20 deletions(-) diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 28cf404..56bdd60 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -77,7 +77,7 @@ tcp_port :: non_neg_integer(), data_dir :: string(), wedged = true :: boolean(), - epoch_id :: 'undefined' | machi_projection:pv_epoch(), + epoch_id :: 'undefined' | pv1_epoch(), dbg_props = [] :: list(), % proplist props = [] :: list() % proplist }). @@ -174,9 +174,12 @@ listen_server_loop(LSock, S) -> spawn_link(fun() -> net_server_loop(Sock, S) end), listen_server_loop(LSock, S). -append_server_loop(FluPid, #state{data_dir=DataDir}=S) -> +append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) -> AppendServerPid = self(), receive + {seq_append, From, _Prefix, _Chunk, _CSum} when Wedged_p -> + From ! wedged, + append_server_loop(FluPid, S); {seq_append, From, Prefix, Chunk, CSum} -> spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum, DataDir, AppendServerPid) end), @@ -298,7 +301,9 @@ do_net_server_append2(FluName, Sock, LenHex, Prefix) -> {assignment, Offset, File} -> OffsetHex = machi_util:bin_to_hexstr(<>), Out = io_lib:format("OK ~s ~s\n", [OffsetHex, File]), - ok = gen_tcp:send(Sock, Out) + ok = gen_tcp:send(Sock, Out); + wedged -> + ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>) after 10*1000 -> ok = gen_tcp:send(Sock, "TIMEOUT\n") end. diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 8a1fc1e..50073ce 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -56,6 +56,7 @@ -type chunk_s() :: binary(). % server always uses binary() -type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}. -type chunk_size() :: non_neg_integer(). +-type error_general() :: 'bad_arg' | 'wedged'. -type epoch_csum() :: binary(). -type epoch_num() :: -1 | non_neg_integer(). -type epoch_id() :: {epoch_num(), epoch_csum()}. @@ -76,7 +77,7 @@ %% with `Prefix'. -spec append_chunk(port(), epoch_id(), file_prefix(), chunk()) -> - {ok, chunk_pos()} | {error, term()}. + {ok, chunk_pos()} | {error, error_general()} | {error, term()}. append_chunk(Sock, EpochID, Prefix, Chunk) -> append_chunk2(Sock, EpochID, Prefix, Chunk). @@ -85,7 +86,7 @@ append_chunk(Sock, EpochID, Prefix, Chunk) -> -spec append_chunk(inet_host(), inet_port(), epoch_id(), file_prefix(), chunk()) -> - {ok, chunk_pos()} | {error, term()}. + {ok, chunk_pos()} | {error, error_general()} | {error, term()}. append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) -> Sock = machi_util:connect(Host, TcpPort), try @@ -97,7 +98,9 @@ append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) -> %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. -spec read_chunk(port(), epoch_id(), file_name(), file_offset(), chunk_size()) -> - {ok, chunk_s()} | {error, term()}. + {ok, chunk_s()} | + {error, error_general() | 'no_such_file' | 'partial_read'} | + {error, term()}. read_chunk(Sock, EpochID, File, Offset, Size) when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> read_chunk2(Sock, EpochID, File, Offset, Size). @@ -106,7 +109,9 @@ read_chunk(Sock, EpochID, File, Offset, Size) -spec read_chunk(inet_host(), inet_port(), epoch_id(), file_name(), file_offset(), chunk_size()) -> - {ok, chunk_s()} | {error, term()}. + {ok, chunk_s()} | + {error, error_general() | 'no_such_file' | 'partial_read'} | + {error, term()}. read_chunk(Host, TcpPort, EpochID, File, Offset, Size) when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> Sock = machi_util:connect(Host, TcpPort), @@ -119,14 +124,17 @@ read_chunk(Host, TcpPort, EpochID, File, Offset, Size) %% @doc Fetch the list of chunk checksums for `File'. -spec checksum_list(port(), epoch_id(), file_name()) -> - {ok, [chunk_csum()]} | {error, term()}. + {ok, [chunk_csum()]} | + {error, error_general() | 'no_such_file' | 'partial_read'} | + {error, term()}. checksum_list(Sock, EpochID, File) when is_port(Sock) -> checksum_list2(Sock, EpochID, File). %% @doc Fetch the list of chunk checksums for `File'. -spec checksum_list(inet_host(), inet_port(), epoch_id(), file_name()) -> - {ok, [chunk_csum()]} | {error, term()}. + {ok, [chunk_csum()]} | + {error, error_general() | 'no_such_file'} | {error, term()}. checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> Sock = machi_util:connect(Host, TcpPort), try @@ -157,7 +165,7 @@ list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) -> %% @doc Fetch the wedge status from the remote FLU. -spec wedge_status(port()) -> - {ok, {boolean(), machi_projection:pv1_epoch()}} | {error, term()}. + {ok, {boolean(), pv1_epoch()}} | {error, term()}. wedge_status(Sock) when is_port(Sock) -> wedge_status2(Sock). @@ -165,7 +173,7 @@ wedge_status(Sock) when is_port(Sock) -> %% @doc Fetch the wedge status from the remote FLU. -spec wedge_status(inet_host(), inet_port()) -> - {ok, {boolean(), machi_projection:pv1_epoch()}} | {error, term()}. + {ok, {boolean(), pv1_epoch()}} | {error, term()}. wedge_status(Host, TcpPort) when is_integer(TcpPort) -> Sock = machi_util:connect(Host, TcpPort), try @@ -323,7 +331,7 @@ quit(Sock) when is_port(Sock) -> %% `File' at `Offset'. -spec write_chunk(port(), epoch_id(), file_name(), file_offset(), chunk()) -> - ok | {error, term()}. + ok | {error, error_general()} | {error, term()}. write_chunk(Sock, EpochID, File, Offset, Chunk) when Offset >= ?MINIMUM_OFFSET -> write_chunk2(Sock, EpochID, File, Offset, Chunk). @@ -333,7 +341,7 @@ write_chunk(Sock, EpochID, File, Offset, Chunk) -spec write_chunk(inet_host(), inet_port(), epoch_id(), file_name(), file_offset(), chunk()) -> - ok | {error, term()}. + ok | {error, error_general()} | {error, term()}. write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk) when Offset >= ?MINIMUM_OFFSET -> Sock = machi_util:connect(Host, TcpPort), @@ -347,7 +355,7 @@ write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk) %% migrated. -spec delete_migration(port(), epoch_id(), file_name()) -> - ok | {error, term()}. + ok | {error, error_general() | 'no_such_file'} | {error, term()}. delete_migration(Sock, EpochID, File) when is_port(Sock) -> delete_migration2(Sock, EpochID, File). @@ -355,7 +363,7 @@ delete_migration(Sock, EpochID, File) when is_port(Sock) -> %% migrated. -spec delete_migration(inet_host(), inet_port(), epoch_id(), file_name()) -> - ok | {error, term()}. + ok | {error, error_general() | 'no_such_file'} | {error, term()}. delete_migration(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> Sock = machi_util:connect(Host, TcpPort), try @@ -368,7 +376,7 @@ delete_migration(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> %% erasure coded. -spec trunc_hack(port(), epoch_id(), file_name()) -> - ok | {error, term()}. + ok | {error, error_general() | 'no_such_file'} | {error, term()}. trunc_hack(Sock, EpochID, File) when is_port(Sock) -> trunc_hack2(Sock, EpochID, File). @@ -376,7 +384,7 @@ trunc_hack(Sock, EpochID, File) when is_port(Sock) -> %% erasure coded. -spec trunc_hack(inet_host(), inet_port(), epoch_id(), file_name()) -> - ok | {error, term()}. + ok | {error, error_general() | 'no_such_file'} | {error, term()}. trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> Sock = machi_util:connect(Host, TcpPort), try @@ -410,6 +418,8 @@ append_chunk2(Sock, EpochID, Prefix0, Chunk0) -> {ok, {Offset, Len, Path}}; <<"ERROR BAD-ARG", _/binary>> -> {error, bad_arg}; + <<"ERROR WEDGED", _/binary>> -> + {error, wedged}; <<"ERROR ", Rest/binary>> -> {error, Rest} end @@ -454,6 +464,8 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) -> {error, bad_arg}; <<"OR PARTIAL-READ\n">> -> {error, partial_read}; + <<"OR WEDGED", _/binary>> -> + {error, wedged}; _ -> {error, Else2} end; @@ -538,6 +550,8 @@ checksum_list2(Sock, EpochID, File) -> {error, no_such_file}; {ok, <<"ERROR BAD-ARG", _/binary>>} -> {error, bad_arg}; + {ok, <<"ERROR WEDGED", _/binary>>} -> + {error, wedged}; {ok, Else} -> throw({server_protocol_error, Else}) end @@ -599,6 +613,8 @@ write_chunk2(Sock, EpochID, File0, Offset, Chunk0) -> ok; <<"ERROR BAD-ARG", _/binary>> -> {error, bad_arg}; + <<"ERROR WEDGED", _/binary>> -> + {error, wedged}; <<"ERROR ", _/binary>>=Else -> {error, {server_said, Else}} end @@ -626,6 +642,8 @@ delete_migration2(Sock, EpochID, File) -> {error, no_such_file}; {ok, <<"ERROR BAD-ARG", _/binary>>} -> {error, bad_arg}; + {ok, <<"ERROR WEDGED", _/binary>>} -> + {error, wedged}; {ok, Else} -> throw({server_protocol_error, Else}) end @@ -653,6 +671,8 @@ trunc_hack2(Sock, EpochID, File) -> {error, no_such_file}; {ok, <<"ERROR BAD-ARG", _/binary>>} -> {error, bad_arg}; + {ok, <<"ERROR WEDGED", _/binary>>} -> + {error, wedged}; {ok, Else} -> throw({server_protocol_error, Else}) end diff --git a/test/machi_flu_psup_test.erl b/test/machi_flu_psup_test.erl index 5b289d5..2c04ef5 100644 --- a/test/machi_flu_psup_test.erl +++ b/test/machi_flu_psup_test.erl @@ -90,11 +90,12 @@ partial_stop_restart2() -> try [Start(P) || P <- Ps], [{ok, {true, _}} = WedgeStatus(P) || P <- Ps], % all are wedged - [bummer = Append(P) || P <- Ps], % all are wedged + [{error,wedged} = Append(P) || P <- Ps], % all are wedged [machi_chain_manager1:set_chain_members(ChMgr, Dict) || ChMgr <- ChMgrs ], [{ok, {false, _}} = WedgeStatus(P) || P <- Ps], % *not* wedged + [{ok,_} = Append(P) || P <- Ps], % *not* wedged {_,_,_} = machi_chain_manager1:test_react_to_env(hd(ChMgrs)), [begin @@ -118,6 +119,7 @@ partial_stop_restart2() -> Proj_mCSum = Proj_m#projection_v1.epoch_csum, [{ok, {false, {Epoch_m, Proj_mCSum}}} = WedgeStatus(P) || % *not* wedged P <- Ps], + [{ok,_} = Append(P) || P <- Ps], % *not* wedged %% Stop all but 'a'. [ok = machi_flu_psup:stop_flu_package(Name) || {Name,_} <- tl(Ps)], @@ -129,11 +131,14 @@ partial_stop_restart2() -> %% Remember: 'a' is not in active mode. {ok, Proj_m} = machi_projection_store:read_latest_projection( hd(PStores), private), - %% TODO: confirm that 'a' is wedged + %% Confirm that 'a' is wedged + {error, wedged} = Append(hd(Ps)), + %% Iterate through humming consensus once {now_using,_,Epoch_n} = machi_chain_manager1:test_react_to_env( hd(ChMgrs)), true = (Epoch_n > Epoch_m), - %% TODO: confirm that 'b' is wedged + %% Confirm that 'a' is *not* wedged + {ok, _} = Append(hd(Ps)), ok after From 6f7818fca757a00875a53d1f3e869e22a319919e Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Fri, 8 May 2015 19:50:47 +0900 Subject: [PATCH 09/12] WIP: additional tests for wedge state --- src/machi_flu1.erl | 77 +++++++++++++++++++++++++----------- test/machi_flu_psup_test.erl | 9 +++++ 2 files changed, 64 insertions(+), 22 deletions(-) diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 56bdd60..f6ebb06 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -77,6 +77,7 @@ tcp_port :: non_neg_integer(), data_dir :: string(), wedged = true :: boolean(), + etstab :: ets:tid(), epoch_id :: 'undefined' | pv1_epoch(), dbg_props = [] :: list(), % proplist props = [] :: list() % proplist @@ -101,14 +102,23 @@ update_wedge_state(PidSpec, Boolean, EpochId) %%%%%%%%%%%%%%%%%%%%%%%%%%%% +ets_table_name(FluName) when is_atom(FluName) -> + list_to_atom(atom_to_list(FluName) ++ "_epoch"); +ets_table_name(FluName) when is_binary(FluName) -> + list_to_atom(binary_to_list(FluName) ++ "_epoch"). + main2(FluName, TcpPort, DataDir, Rest) -> S0 = #state{flu_name=FluName, tcp_port=TcpPort, data_dir=DataDir, wedged=true, + etstab=ets_table_name(FluName), epoch_id=undefined, props=Rest}, - AppendPid = start_append_server(S0), + AppendPid = start_append_server(S0, self()), + receive + append_server_ack -> ok + end, {_ProjRegName, ProjectionPid} = case proplists:get_value(projection_store_registered_name, Rest) of undefined -> @@ -150,9 +160,9 @@ main2(FluName, TcpPort, DataDir, Rest) -> start_listen_server(S) -> proc_lib:spawn_link(fun() -> run_listen_server(S) end). -start_append_server(S) -> +start_append_server(S, AckPid) -> FluPid = self(), - proc_lib:spawn_link(fun() -> run_append_server(FluPid, S) end). + proc_lib:spawn_link(fun() -> run_append_server(FluPid, AckPid, S) end). %% start_projection_server(S) -> %% spawn_link(fun() -> run_projection_server(S) end). @@ -164,10 +174,14 @@ run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> {ok, LSock} = gen_tcp:listen(TcpPort, SockOpts), listen_server_loop(LSock, S). -run_append_server(FluPid, #state{flu_name=Name}=S) -> +run_append_server(FluPid, AckPid, #state{flu_name=Name}=S) -> %% Reminder: Name is the "main" name of the FLU, i.e., no suffix register(Name, self()), - append_server_loop(FluPid, S). + TID = ets:new(ets_table_name(Name), + [set, protected, named_table, {read_concurrency, true}]), + ets:insert(TID, {epoch, {true, {-1, <<>>}}}), + AckPid ! append_server_ack, + append_server_loop(FluPid, S#state{etstab=TID}). listen_server_loop(LSock, S) -> {ok, Sock} = gen_tcp:accept(LSock), @@ -186,6 +200,7 @@ append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) -> %% DataDir, FluPid) end), append_server_loop(FluPid, S); {wedge_state_change, Boolean, EpochId} -> + true = ets:insert(S#state.etstab, {epoch, {Boolean, EpochId}}), append_server_loop(FluPid, S#state{wedged=Boolean, epoch_id=EpochId}); {wedge_status, FromPid} -> @@ -217,10 +232,14 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> Prefix:PrefixLenLF/binary, "\n">> -> do_net_server_append(FluName, Sock, LenHex, Prefix); <<"R ", - _EpochIDRaw:(?EpochIDSpace)/binary, + EpochIDRaw:(?EpochIDSpace)/binary, OffsetHex:16/binary, LenHex:8/binary, File:FileLenLF/binary, "\n">> -> - do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir); + {Wedged_p, CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2), + io:format(user, "TT wedged ~p, CurrentEpochId ~p\n", [Wedged_p, CurrentEpochId]), + timer:sleep(50), + do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir, + EpochIDRaw, Wedged_p, CurrentEpochId); <<"L ", _EpochIDRaw:(?EpochIDSpace)/binary, "\n">> -> do_net_server_listing(Sock, DataDir); <<"C ", @@ -238,7 +257,8 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> _EpochIDRaw:(?EpochIDSpace)/binary, OffsetHex:16/binary, LenHex:8/binary, File:WriteFileLenLF/binary, "\n">> -> - do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir); + do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir, + <<"fixme1">>, false, <<"fixme2">>); %% For data migration only. <<"DEL-migration ", _EpochIDRaw:(?EpochIDSpace)/binary, @@ -329,7 +349,8 @@ do_wedge_status(FluName, Sock) -> end, ok = gen_tcp:send(Sock, Reply). -do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir) -> +do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir, + EpochIDRaw, Wedged_p, CurrentEpochId) -> DoItFun = fun(FH, Offset, Len) -> case file:pread(FH, Offset, Len) of {ok, Bytes} when byte_size(Bytes) == Len -> @@ -347,20 +368,26 @@ do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir) -> end end, do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir, - [read, binary, raw], DoItFun). + [read, binary, raw], DoItFun, + EpochIDRaw, Wedged_p, CurrentEpochId). do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir, - FileOpts, DoItFun) -> - case sanitize_file_string(FileBin) of - ok -> + FileOpts, DoItFun, + EpochIDRaw, Wedged_p, CurrentEpochId) -> + case {Wedged_p, sanitize_file_string(FileBin)} of + {false, ok} -> do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, - DataDir, FileOpts, DoItFun); - _ -> + DataDir, FileOpts, DoItFun, + EpochIDRaw, Wedged_p, CurrentEpochId); + {true, _} -> + ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>); + {_, __} -> ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>) end. do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, - FileOpts, DoItFun) -> + FileOpts, DoItFun, + EpochIDRaw, Wedged_p, CurrentEpochId) -> <> = machi_util:hexstr_to_bin(OffsetHex), <> = machi_util:hexstr_to_bin(LenHex), {_, Path} = machi_util:make_data_filename(DataDir, FileBin), @@ -375,24 +402,29 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, {error, enoent} when OptsHasWrite -> do_net_server_readwrite_common( Sock, OffsetHex, LenHex, FileBin, DataDir, - FileOpts, DoItFun); + FileOpts, DoItFun, + EpochIDRaw, Wedged_p, CurrentEpochId); _Else -> %%%%%% keep?? machi_util:verb("Else ~p ~p ~p ~p\n", [Offset, Len, Path, _Else]), ok = gen_tcp:send(Sock, <<"ERROR BAD-IO\n">>) end. -do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir) -> +do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir, + EpochIDRaw, Wedged_p, CurrentEpochId) -> CSumPath = machi_util:make_checksum_filename(DataDir, FileBin), case file:open(CSumPath, [append, raw, binary, delayed_write]) of {ok, FHc} -> - do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc); + do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc, + EpochIDRaw, Wedged_p, CurrentEpochId); {error, enoent} -> ok = filelib:ensure_dir(CSumPath), - do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir) + do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir, + EpochIDRaw, Wedged_p, CurrentEpochId) end. -do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc) -> +do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc, + EpochIDRaw, Wedged_p, CurrentEpochId) -> DoItFun = fun(FHd, Offset, Len) -> ok = inet:setopts(Sock, [{packet, raw}]), {ok, Chunk} = gen_tcp:recv(Sock, Len), @@ -411,7 +443,8 @@ do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc) -> end end, do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir, - [write, read, binary, raw], DoItFun). + [write, read, binary, raw], DoItFun, + EpochIDRaw, Wedged_p, CurrentEpochId). perhaps_do_net_server_ec_read(Sock, FH) -> case file:pread(FH, 0, ?MINIMUM_OFFSET) of diff --git a/test/machi_flu_psup_test.erl b/test/machi_flu_psup_test.erl index 2c04ef5..94ecf69 100644 --- a/test/machi_flu_psup_test.erl +++ b/test/machi_flu_psup_test.erl @@ -133,6 +133,15 @@ partial_stop_restart2() -> hd(PStores), private), %% Confirm that 'a' is wedged {error, wedged} = Append(hd(Ps)), + {_, #p_srvr{address=Addr_a, port=TcpPort_a}} = hd(Ps), + {error, wedged} = machi_flu1_client:read_chunk( + Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH, + <<>>, 99999999, 1), + {error, wedged} = machi_flu1_client:checksum_list( + Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH, <<>>), + {error, wedged} = machi_flu1_client:list_files( + Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH), + %% Iterate through humming consensus once {now_using,_,Epoch_n} = machi_chain_manager1:test_react_to_env( hd(ChMgrs)), From 0dd9282789702fc6e1af11e0810771a33029af49 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Fri, 8 May 2015 21:24:07 +0900 Subject: [PATCH 10/12] WIP: fix other broken eunit tests, surrounding wedge state --- src/machi_flu1.erl | 27 +++++++++++++-------------- test/machi_admin_util_test.erl | 4 +++- test/machi_flu1_test.erl | 5 +++-- test/machi_proxy_flu1_client_test.erl | 10 ++++++---- 4 files changed, 25 insertions(+), 21 deletions(-) diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index f6ebb06..e577ba7 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -108,13 +108,20 @@ ets_table_name(FluName) when is_binary(FluName) -> list_to_atom(binary_to_list(FluName) ++ "_epoch"). main2(FluName, TcpPort, DataDir, Rest) -> + {Props, DbgProps} = case proplists:get_value(dbg, Rest) of + undefined -> + {Rest, []}; + DPs -> + {lists:keydelete(dbg, 1, Rest), DPs} + end, S0 = #state{flu_name=FluName, tcp_port=TcpPort, data_dir=DataDir, - wedged=true, + wedged=proplists:get_value(initial_wedged, DbgProps, true), etstab=ets_table_name(FluName), epoch_id=undefined, - props=Rest}, + dbg_props=DbgProps, + props=Props}, AppendPid = start_append_server(S0, self()), receive append_server_ack -> ok @@ -131,14 +138,7 @@ main2(FluName, TcpPort, DataDir, Rest) -> end, S1 = S0#state{append_pid=AppendPid, proj_store=ProjectionPid}, - S2 = case proplists:get_value(dbg, Rest) of - undefined -> - S1; - DbgProps -> - S1#state{dbg_props=DbgProps, - props=lists:keydelete(dbg, 1, Rest)} - end, - ListenPid = start_listen_server(S2), + ListenPid = start_listen_server(S1), Config_e = machi_util:make_config_filename(DataDir, "unused"), ok = filelib:ensure_dir(Config_e), @@ -174,12 +174,13 @@ run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> {ok, LSock} = gen_tcp:listen(TcpPort, SockOpts), listen_server_loop(LSock, S). -run_append_server(FluPid, AckPid, #state{flu_name=Name}=S) -> +run_append_server(FluPid, AckPid, #state{flu_name=Name,dbg_props=DbgProps}=S) -> %% Reminder: Name is the "main" name of the FLU, i.e., no suffix register(Name, self()), TID = ets:new(ets_table_name(Name), [set, protected, named_table, {read_concurrency, true}]), - ets:insert(TID, {epoch, {true, {-1, <<>>}}}), + InitialWedged = proplists:get_value(initial_wedged, DbgProps, true), + ets:insert(TID, {epoch, {InitialWedged, {-65, <<"bogus epoch, yo">>}}}), AckPid ! append_server_ack, append_server_loop(FluPid, S#state{etstab=TID}). @@ -236,8 +237,6 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> OffsetHex:16/binary, LenHex:8/binary, File:FileLenLF/binary, "\n">> -> {Wedged_p, CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2), - io:format(user, "TT wedged ~p, CurrentEpochId ~p\n", [Wedged_p, CurrentEpochId]), - timer:sleep(50), do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir, EpochIDRaw, Wedged_p, CurrentEpochId); <<"L ", _EpochIDRaw:(?EpochIDSpace)/binary, "\n">> -> diff --git a/test/machi_admin_util_test.erl b/test/machi_admin_util_test.erl index ded534a..09d4717 100644 --- a/test/machi_admin_util_test.erl +++ b/test/machi_admin_util_test.erl @@ -35,7 +35,9 @@ verify_file_checksums_test() -> Host = "localhost", TcpPort = 32958, DataDir = "./data", - FLU1 = machi_flu1_test:setup_test_flu(verify1_flu, TcpPort, DataDir), + W_props = [{initial_wedged, false}], + FLU1 = machi_flu1_test:setup_test_flu(verify1_flu, TcpPort, DataDir, + W_props), Sock1 = machi_util:connect(Host, TcpPort), try Prefix = <<"verify_prefix">>, diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index a521bd0..b66dcc3 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -55,7 +55,8 @@ flu_smoke_test() -> Prefix = <<"prefix!">>, BadPrefix = BadFile = "no/good", - FLU1 = setup_test_flu(smoke_flu, TcpPort, DataDir), + W_props = [{initial_wedged, false}], + FLU1 = setup_test_flu(smoke_flu, TcpPort, DataDir, W_props), try {error, no_such_file} = ?FLU_C:checksum_list(Host, TcpPort, ?DUMMY_PV1_EPOCH, @@ -64,7 +65,7 @@ flu_smoke_test() -> ?DUMMY_PV1_EPOCH, BadFile), {ok, []} = ?FLU_C:list_files(Host, TcpPort, ?DUMMY_PV1_EPOCH), - {ok, {true, {0,<<0:(20*8)/big>>}}} = ?FLU_C:wedge_status(Host, TcpPort), + {ok, {false, _}} = ?FLU_C:wedge_status(Host, TcpPort), Chunk1 = <<"yo!">>, {ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort, diff --git a/test/machi_proxy_flu1_client_test.erl b/test/machi_proxy_flu1_client_test.erl index cc3a8f3..624856e 100644 --- a/test/machi_proxy_flu1_client_test.erl +++ b/test/machi_proxy_flu1_client_test.erl @@ -33,7 +33,9 @@ api_smoke_test() -> Host = "localhost", TcpPort = 57124, DataDir = "./data.api_smoke_flu", - FLU1 = machi_flu1_test:setup_test_flu(RegName, TcpPort, DataDir), + W_props = [{initial_wedged, false}], + FLU1 = machi_flu1_test:setup_test_flu(RegName, TcpPort, DataDir, + W_props), erase(flu_pid), try @@ -54,19 +56,19 @@ api_smoke_test() -> infinity), %% Start the FLU again, we should be able to do stuff immediately FLU1b = machi_flu1_test:setup_test_flu(RegName, TcpPort, DataDir, - [save_data_dir]), + [save_data_dir|W_props]), put(flu_pid, FLU1b), MyChunk = <<"my chunk data">>, {ok, {MyOff,MySize,MyFile}} = ?MUT:append_chunk(Prox1, FakeEpoch, <<"prefix">>, MyChunk, - infinity), + infinity), {ok, MyChunk} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize), %% Alright, now for the rest of the API, whee BadFile = <<"no-such-file">>, {error, no_such_file} = ?MUT:checksum_list(Prox1, FakeEpoch, BadFile), {ok, [_|_]} = ?MUT:list_files(Prox1, FakeEpoch), - {ok, {true, {0,<<0:(20*8)/big>>}}} = ?MUT:wedge_status(Prox1), + {ok, {false, _}} = ?MUT:wedge_status(Prox1), {ok, FakeEpoch} = ?MUT:get_latest_epoch(Prox1, public), {error, not_written} = ?MUT:read_latest_projection(Prox1, public), {error, not_written} = ?MUT:read_projection(Prox1, public, 44), From dc43a32945244ffbd44268d18f47d57c45fc94f4 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Fri, 8 May 2015 21:37:19 +0900 Subject: [PATCH 11/12] WIP: tests for wedge state all working --- src/machi_flu1.erl | 35 +++++++++++++++++++++++------------ src/machi_flu1_client.erl | 14 ++++++++++---- 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index e577ba7..47bd52a 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -103,9 +103,9 @@ update_wedge_state(PidSpec, Boolean, EpochId) %%%%%%%%%%%%%%%%%%%%%%%%%%%% ets_table_name(FluName) when is_atom(FluName) -> - list_to_atom(atom_to_list(FluName) ++ "_epoch"); -ets_table_name(FluName) when is_binary(FluName) -> - list_to_atom(binary_to_list(FluName) ++ "_epoch"). + list_to_atom(atom_to_list(FluName) ++ "_epoch"). +%% ets_table_name(FluName) when is_binary(FluName) -> +%% list_to_atom(binary_to_list(FluName) ++ "_epoch"). main2(FluName, TcpPort, DataDir, Rest) -> {Props, DbgProps} = case proplists:get_value(dbg, Rest) of @@ -236,15 +236,14 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> EpochIDRaw:(?EpochIDSpace)/binary, OffsetHex:16/binary, LenHex:8/binary, File:FileLenLF/binary, "\n">> -> - {Wedged_p, CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2), do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir, - EpochIDRaw, Wedged_p, CurrentEpochId); + EpochIDRaw, S); <<"L ", _EpochIDRaw:(?EpochIDSpace)/binary, "\n">> -> - do_net_server_listing(Sock, DataDir); + do_net_server_listing(Sock, DataDir, S); <<"C ", _EpochIDRaw:(?EpochIDSpace)/binary, File:CSumFileLenLF/binary, "\n">> -> - do_net_server_checksum_listing(Sock, File, DataDir); + do_net_server_checksum_listing(Sock, File, DataDir, S); <<"QUIT\n">> -> catch gen_tcp:close(Sock), exit(normal); @@ -349,7 +348,8 @@ do_wedge_status(FluName, Sock) -> ok = gen_tcp:send(Sock, Reply). do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir, - EpochIDRaw, Wedged_p, CurrentEpochId) -> + EpochIDRaw, S) -> + {Wedged_p, CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2), DoItFun = fun(FH, Offset, Len) -> case file:pread(FH, Offset, Len) of {ok, Bytes} when byte_size(Bytes) == Len -> @@ -469,7 +469,15 @@ decode_and_reply_net_server_ec_read_version_a(Sock, Rest) -> <> = Rest2, ok = gen_tcp:send(Sock, ["ERASURE ", BodyLenHex, " ", Hdr, Body]). -do_net_server_listing(Sock, DataDir) -> +do_net_server_listing(Sock, DataDir, S) -> + {Wedged_p, _CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2), + if Wedged_p -> + ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>); + true -> + do_net_server_listing2(Sock, DataDir) + end. + +do_net_server_listing2(Sock, DataDir) -> {_, WildPath} = machi_util:make_data_filename(DataDir, ""), Files = filelib:wildcard("*", WildPath), Out = ["OK\n", @@ -484,9 +492,12 @@ do_net_server_listing(Sock, DataDir) -> ], ok = gen_tcp:send(Sock, Out). -do_net_server_checksum_listing(Sock, File, DataDir) -> - case sanitize_file_string(File) of - ok -> +do_net_server_checksum_listing(Sock, File, DataDir, S) -> + {Wedged_p, _CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2), + case {Wedged_p, sanitize_file_string(File)} of + {true, _} -> + ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>); + {false, ok} -> do_net_server_checksum_listing2(Sock, File, DataDir); _ -> ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>) diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 50073ce..1311d74 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -488,10 +488,16 @@ list2(Sock, EpochID) -> EpochIDRaw = <>, ok = gen_tcp:send(Sock, [<<"L ">>, EpochIDRaw, <<"\n">>]), ok = inet:setopts(Sock, [{packet, line}]), - {ok, <<"OK\n">>} = gen_tcp:recv(Sock, 0), - Res = list3(gen_tcp:recv(Sock, 0), Sock), - ok = inet:setopts(Sock, [{packet, raw}]), - {ok, Res} + case gen_tcp:recv(Sock, 0) of + {ok, <<"OK\n">>} -> + Res = list3(gen_tcp:recv(Sock, 0), Sock), + ok = inet:setopts(Sock, [{packet, raw}]), + {ok, Res}; + {ok, <<"ERROR WEDGED\n">>} -> + {error, wedged}; + {ok, <<"ERROR ", Rest/binary>>} -> + {error, Rest} + end catch throw:Error -> Error; From 2ef43bead3e18edcfac53af820fedc7fc3c037f5 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Fri, 8 May 2015 21:41:08 +0900 Subject: [PATCH 12/12] WIP: tests for wedge state all working --- TODO-shortterm.org | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/TODO-shortterm.org b/TODO-shortterm.org index cc81b8b..8e01a13 100644 --- a/TODO-shortterm.org +++ b/TODO-shortterm.org @@ -30,8 +30,8 @@ func, and pattern match Erlang style in that func. ** DONE Move prototype/chain-manager code to "top" of source tree *** DONE Preserve current test code (leave as-is? tiny changes?) *** DONE Make chain manager code flexible enough to run "real world" or "sim" -** TODO Add projection wedging logic to each FLU. -** TODO Implement real data repair, orchestrated by the chain manager +** DONE Add projection wedging logic to each FLU. +** Started.... Implement real data repair, orchestrated by the chain manager ** TODO Change all protocol ops to enforce the epoch ID - Add no-wedging state to make testing easier?