diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 7696abf..2e70005 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -85,7 +85,7 @@ -define(SERVER_CMD_READ_TIMEOUT, 600*1000). -export([start_link/1, stop/1, - update_wedge_state/3]). + update_wedge_state/3, wedge_myself/2]). -export([make_listener_regname/1, make_projection_server_regname/1]). -export([encode_csum_file_entry/3, encode_csum_file_entry_bin/3, decode_csum_file_entry/1, @@ -128,6 +128,10 @@ update_wedge_state(PidSpec, Boolean, EpochId) when (Boolean == true orelse Boolean == false), is_tuple(EpochId) -> PidSpec ! {wedge_state_change, Boolean, EpochId}. +wedge_myself(PidSpec, EpochId) + when is_tuple(EpochId) -> + PidSpec ! {wedge_myself, EpochId}. + %%%%%%%%%%%%%%%%%%%%%%%%%%%% ets_table_name(FluName) when is_atom(FluName) -> @@ -246,7 +250,8 @@ listen_server_loop(LSock, S) -> spawn_link(fun() -> net_server_loop(Sock, S) end), listen_server_loop(LSock, S). -append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) -> +append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p, + epoch_id=OldEpochId}=S) -> AppendServerPid = self(), receive {seq_append, From, _Prefix, _Chunk, _CSum, _Extra} when Wedged_p -> @@ -257,10 +262,26 @@ append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) -> Chunk, CSum, Extra, DataDir, AppendServerPid) end), append_server_loop(FluPid, S); - {wedge_state_change, Boolean, EpochId} -> - true = ets:insert(S#state.etstab, {epoch, {Boolean, EpochId}}), - append_server_loop(FluPid, S#state{wedged=Boolean, - epoch_id=EpochId}); + {wedge_myself, WedgeEpochId} -> + if WedgeEpochId == OldEpochId -> + true = ets:insert(S#state.etstab, + {epoch, {true, OldEpochId}}), + append_server_loop(FluPid, S#state{wedged=true}); + true -> + append_server_loop(FluPid, S) + end; + {wedge_state_change, Boolean, {NewEpoch, _}=NewEpochId} -> + OldEpoch = case OldEpochId of {OldE, _} -> OldE; + undefined -> -1 + end, + if NewEpoch >= OldEpoch -> + true = ets:insert(S#state.etstab, + {epoch, {Boolean, NewEpochId}}), + append_server_loop(FluPid, S#state{wedged=Boolean, + epoch_id=NewEpochId}); + true -> + append_server_loop(FluPid, S) + end; {wedge_status, FromPid} -> #state{wedged=Wedged_p, epoch_id=EpochId} = S, FromPid ! {wedge_status_reply, Wedged_p, EpochId}, @@ -350,8 +371,9 @@ do_pb_ll_request2(EpochID, CMD, S) -> true -> %% We're at same epoch # but different checksum, or %% we're at a newer/bigger epoch #. - io:format(user, "\n\nTODO: wedge myself!\n\n", []), - todo_wedge_myself + io:format(user, "\n\nTODO/monitor: wedging myself!\n\n",[]), + wedge_myself(S#state.flu_name, CurrentEpochID), + ok end, {{error, bad_epoch}, S}; true ->