Sequencer changes file sequence number when epoch_id change is detected

This commit is contained in:
Scott Lystig Fritchie 2015-07-03 01:58:13 +09:00
parent 039fd5fb78
commit ff66638eb3
3 changed files with 56 additions and 39 deletions

View file

@ -62,13 +62,6 @@
%% the FLU keep track of the epoch number of the last file write (and %% the FLU keep track of the epoch number of the last file write (and
%% perhaps last metadata write), as an optimization for inter-FLU data %% perhaps last metadata write), as an optimization for inter-FLU data
%% replication/chain repair. %% replication/chain repair.
%%
%% TODO Section 4.2 ("The Sequencer") says that the sequencer must
%% change its file assignments to new & unique names whenever we move
%% to wedge state. This is not yet implemented. In the current
%% Erlang process scheme (which will probably be changing soon), a
%% simple implementation would stop all existing processes that are
%% running run_seq_append_server().
-module(machi_flu1). -module(machi_flu1).
@ -254,12 +247,13 @@ append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p,
epoch_id=OldEpochId}=S) -> epoch_id=OldEpochId}=S) ->
AppendServerPid = self(), AppendServerPid = self(),
receive receive
{seq_append, From, _Prefix, _Chunk, _CSum, _Extra} when Wedged_p -> {seq_append, From, _Prefix, _Chunk, _CSum, _Extra, _EpochID}
when Wedged_p ->
From ! wedged, From ! wedged,
append_server_loop(FluPid, S); append_server_loop(FluPid, S);
{seq_append, From, Prefix, Chunk, CSum, Extra} -> {seq_append, From, Prefix, Chunk, CSum, Extra, EpochID} ->
spawn(fun() -> append_server_dispatch(From, Prefix, spawn(fun() -> append_server_dispatch(From, Prefix,
Chunk, CSum, Extra, Chunk, CSum, Extra, EpochID,
DataDir, AppendServerPid) end), DataDir, AppendServerPid) end),
append_server_loop(FluPid, S); append_server_loop(FluPid, S);
{wedge_myself, WedgeEpochId} -> {wedge_myself, WedgeEpochId} ->
@ -359,7 +353,7 @@ do_pb_ll_request(PB_request, S) ->
do_pb_ll_request2(EpochID, CMD, S) -> do_pb_ll_request2(EpochID, CMD, S) ->
{Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2), {Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2),
if Wedged_p == true -> if Wedged_p == true ->
{{error, wedged}, S}; {{error, wedged}, S#state{epoch_id=CurrentEpochID}};
is_tuple(EpochID) is_tuple(EpochID)
andalso andalso
EpochID /= CurrentEpochID -> EpochID /= CurrentEpochID ->
@ -370,13 +364,12 @@ do_pb_ll_request2(EpochID, CMD, S) ->
true -> true ->
%% We're at same epoch # but different checksum, or %% We're at same epoch # but different checksum, or
%% we're at a newer/bigger epoch #. %% we're at a newer/bigger epoch #.
io:format(user, "\n\nTODO/monitor: wedging myself!\n\n",[]),
wedge_myself(S#state.flu_name, CurrentEpochID), wedge_myself(S#state.flu_name, CurrentEpochID),
ok ok
end, end,
{{error, bad_epoch}, S}; {{error, bad_epoch}, S#state{epoch_id=CurrentEpochID}};
true -> true ->
do_pb_ll_request3(CMD, S) do_pb_ll_request3(CMD, S#state{epoch_id=CurrentEpochID})
end. end.
do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) -> do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) ->
@ -469,11 +462,13 @@ do_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum,
end. end.
do_server_append_chunk2(_PKey, Prefix, Chunk, CSum_tag, Client_CSum, do_server_append_chunk2(_PKey, Prefix, Chunk, CSum_tag, Client_CSum,
ChunkExtra, #state{flu_name=FluName}=_S) -> ChunkExtra, #state{flu_name=FluName,
epoch_id=EpochID}=_S) ->
%% TODO: Do anything with PKey? %% TODO: Do anything with PKey?
try try
TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk), TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk),
FluName ! {seq_append, self(), Prefix, Chunk, TaggedCSum, ChunkExtra}, FluName ! {seq_append, self(), Prefix, Chunk, TaggedCSum,
ChunkExtra, EpochID},
receive receive
{assignment, Offset, File} -> {assignment, Offset, File} ->
Size = iolist_size(Chunk), Size = iolist_size(Chunk),
@ -664,9 +659,10 @@ do_server_trunc_hack(File, #state{data_dir=DataDir}=_S) ->
{error, bad_arg} {error, bad_arg}
end. end.
append_server_dispatch(From, Prefix, Chunk, CSum, Extra, DataDir, LinkPid) -> append_server_dispatch(From, Prefix, Chunk, CSum, Extra, EpochID,
Pid = write_server_get_pid(Prefix, DataDir, LinkPid), DataDir, LinkPid) ->
Pid ! {seq_append, From, Prefix, Chunk, CSum, Extra}, Pid = write_server_get_pid(Prefix, EpochID, DataDir, LinkPid),
Pid ! {seq_append, From, Prefix, Chunk, CSum, Extra, EpochID},
exit(normal). exit(normal).
sanitize_file_string(Str) -> sanitize_file_string(Str) ->
@ -700,12 +696,12 @@ sync_checksum_file(File) ->
end end
end. end.
write_server_get_pid(Prefix, DataDir, LinkPid) -> write_server_get_pid(Prefix, EpochID, DataDir, LinkPid) ->
case write_server_find_pid(Prefix) of case write_server_find_pid(Prefix) of
undefined -> undefined ->
start_seq_append_server(Prefix, DataDir, LinkPid), start_seq_append_server(Prefix, EpochID, DataDir, LinkPid),
timer:sleep(1), timer:sleep(1),
write_server_get_pid(Prefix, DataDir, LinkPid); write_server_get_pid(Prefix, EpochID, DataDir, LinkPid);
Pid -> Pid ->
Pid Pid
end. end.
@ -714,7 +710,7 @@ write_server_find_pid(Prefix) ->
FluName = machi_util:make_regname(Prefix), FluName = machi_util:make_regname(Prefix),
whereis(FluName). whereis(FluName).
start_seq_append_server(Prefix, DataDir, AppendServerPid) -> start_seq_append_server(Prefix, EpochID, DataDir, AppendServerPid) ->
proc_lib:spawn_link(fun() -> proc_lib:spawn_link(fun() ->
%% The following is only necessary to %% The following is only necessary to
%% make nice process relationships in %% make nice process relationships in
@ -722,21 +718,20 @@ start_seq_append_server(Prefix, DataDir, AppendServerPid) ->
put('$ancestors', [AppendServerPid]), put('$ancestors', [AppendServerPid]),
put('$initial_call', {x,y,3}), put('$initial_call', {x,y,3}),
link(AppendServerPid), link(AppendServerPid),
run_seq_append_server(Prefix, DataDir) run_seq_append_server(Prefix, EpochID, DataDir)
end). end).
run_seq_append_server(Prefix, DataDir) -> run_seq_append_server(Prefix, EpochID, DataDir) ->
true = register(machi_util:make_regname(Prefix), self()), true = register(machi_util:make_regname(Prefix), self()),
run_seq_append_server2(Prefix, DataDir). run_seq_append_server2(Prefix, EpochID, DataDir).
run_seq_append_server2(Prefix, DataDir) -> run_seq_append_server2(Prefix, EpochID, DataDir) ->
FileNum = machi_util:read_max_filenum(DataDir, Prefix) + 1, FileNum = machi_util:read_max_filenum(DataDir, Prefix) + 1,
case machi_util:increment_max_filenum(DataDir, Prefix) of case machi_util:increment_max_filenum(DataDir, Prefix) of
ok -> ok ->
machi_util:increment_max_filenum(DataDir, Prefix),
machi_util:info_msg("start: ~p server at file ~w\n", machi_util:info_msg("start: ~p server at file ~w\n",
[Prefix, FileNum]), [Prefix, FileNum]),
seq_append_server_loop(DataDir, Prefix, FileNum); seq_append_server_loop(DataDir, Prefix, EpochID, FileNum);
Else -> Else ->
error_logger:error_msg("start: ~p server at file ~w: ~p\n", error_logger:error_msg("start: ~p server at file ~w: ~p\n",
[Prefix, FileNum, Else]), [Prefix, FileNum, Else]),
@ -750,7 +745,7 @@ seq_name_hack() ->
[element(3,now()), [element(3,now()),
list_to_integer(os:getpid())])). list_to_integer(os:getpid())])).
seq_append_server_loop(DataDir, Prefix, FileNum) -> seq_append_server_loop(DataDir, Prefix, EpochID, FileNum) ->
SequencerNameHack = seq_name_hack(), SequencerNameHack = seq_name_hack(),
{File, FullPath} = machi_util:make_data_filename( {File, FullPath} = machi_util:make_data_filename(
DataDir, Prefix, SequencerNameHack, FileNum), DataDir, Prefix, SequencerNameHack, FileNum),
@ -759,19 +754,22 @@ seq_append_server_loop(DataDir, Prefix, FileNum) ->
CSumPath = machi_util:make_checksum_filename( CSumPath = machi_util:make_checksum_filename(
DataDir, Prefix, SequencerNameHack, FileNum), DataDir, Prefix, SequencerNameHack, FileNum),
{ok, FHc} = file:open(CSumPath, [append, raw, binary]), {ok, FHc} = file:open(CSumPath, [append, raw, binary]),
seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}, FileNum, seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}, EpochID, FileNum,
?MINIMUM_OFFSET). ?MINIMUM_OFFSET).
seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, FileNum, Offset) seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, EpochID,
FileNum, Offset)
when Offset > ?MAX_FILE_SIZE -> when Offset > ?MAX_FILE_SIZE ->
ok = file:close(FHd), ok = file:close(FHd),
ok = file:close(FHc), ok = file:close(FHc),
machi_util:info_msg("rollover: ~p server at file ~w offset ~w\n", machi_util:info_msg("rollover: ~p server at file ~w offset ~w\n",
[Prefix, FileNum, Offset]), [Prefix, FileNum, Offset]),
run_seq_append_server2(Prefix, DataDir); run_seq_append_server2(Prefix, EpochID, DataDir);
seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) -> seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, EpochID,
FileNum, Offset) ->
receive receive
{seq_append, From, Prefix, Chunk, TaggedCSum, Extra} -> {seq_append, From, Prefix, Chunk, TaggedCSum, Extra, R_EpochID}
when R_EpochID == EpochID ->
if Chunk /= <<>> -> if Chunk /= <<>> ->
ok = file:pwrite(FHd, Offset, Chunk); ok = file:pwrite(FHd, Offset, Chunk);
true -> true ->
@ -781,12 +779,20 @@ seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) ->
Size = iolist_size(Chunk), Size = iolist_size(Chunk),
CSum_info = encode_csum_file_entry(Offset, Size, TaggedCSum), CSum_info = encode_csum_file_entry(Offset, Size, TaggedCSum),
ok = file:write(FHc, CSum_info), ok = file:write(FHc, CSum_info),
seq_append_server_loop(DataDir, Prefix, File, FH_, seq_append_server_loop(DataDir, Prefix, File, FH_, EpochID,
FileNum, Offset + Size + Extra); FileNum, Offset + Size + Extra);
{seq_append, _From, _Prefix, _Chunk, _TCSum, _Extra, R_EpochID}=MSG ->
%% Rare'ish event: send MSG to myself so it doesn't get lost
%% while we recurse around to pick up a new FileNum.
self() ! MSG,
machi_util:info_msg("rollover: ~p server at file ~w offset ~w "
"by new epoch_id ~W\n",
[Prefix, FileNum, Offset, R_EpochID, 8]),
run_seq_append_server2(Prefix, R_EpochID, DataDir);
{sync_stuff, FromPid, Ref} -> {sync_stuff, FromPid, Ref} ->
file:sync(FHc), file:sync(FHc),
FromPid ! {sync_finished, Ref}, FromPid ! {sync_finished, Ref},
seq_append_server_loop(DataDir, Prefix, File, FH_, seq_append_server_loop(DataDir, Prefix, File, FH_, EpochID,
FileNum, Offset) FileNum, Offset)
after 30*1000 -> after 30*1000 ->
ok = file:close(FHd), ok = file:close(FHd),
@ -843,7 +849,7 @@ http_hack_server_put(Sock, G, FluName, MyURI) ->
throw({bad_csum, XX}) throw({bad_csum, XX})
end end
end, end,
FluName ! {seq_append, self(), MyURI, Chunk, CSum, 0} FluName ! {seq_append, self(), MyURI, Chunk, CSum, 0, todo_epoch_id_bitrot}
catch catch
throw:{bad_csum, _CS} -> throw:{bad_csum, _CS} ->
Out = "HTTP/1.0 412 Precondition failed\r\n" Out = "HTTP/1.0 412 Precondition failed\r\n"

View file

@ -132,7 +132,7 @@ increment_max_filenum(DataDir, Prefix) ->
try try
{ok, FH} = file:open(make_config_filename(DataDir, Prefix), [append]), {ok, FH} = file:open(make_config_filename(DataDir, Prefix), [append]),
ok = file:write(FH, "x"), ok = file:write(FH, "x"),
%% ok = file:sync(FH), ok = file:sync(FH),
ok = file:close(FH) ok = file:close(FH)
catch catch
error:{badmatch,_}=Error -> error:{badmatch,_}=Error ->

View file

@ -99,6 +99,7 @@ partial_stop_restart2() ->
{ok, {false, EpochID1}} = WedgeStatus(hd(Ps)), {ok, {false, EpochID1}} = WedgeStatus(hd(Ps)),
[{ok, {false, EpochID1}} = WedgeStatus(P) || P <- Ps], % *not* wedged [{ok, {false, EpochID1}} = WedgeStatus(P) || P <- Ps], % *not* wedged
[{ok,_} = Append(P, EpochID1) || P <- Ps], % *not* wedged [{ok,_} = Append(P, EpochID1) || P <- Ps], % *not* wedged
{ok, {_,_,File1}} = Append(hd(Ps), EpochID1),
{_,_,_} = machi_chain_manager1:test_react_to_env(hd(ChMgrs)), {_,_,_} = machi_chain_manager1:test_react_to_env(hd(ChMgrs)),
[begin [begin
@ -126,6 +127,13 @@ partial_stop_restart2() ->
P <- Ps], P <- Ps],
{ok, {false, EpochID2}} = WedgeStatus(hd(Ps)), {ok, {false, EpochID2}} = WedgeStatus(hd(Ps)),
[{ok,_} = Append(P, EpochID2) || P <- Ps], % *not* wedged [{ok,_} = Append(P, EpochID2) || P <- Ps], % *not* wedged
%% The file we're assigned should be different with the epoch change.
{ok, {_,_,File2}} = Append(hd(Ps), EpochID2),
true = (File1 /= File2),
%% If we use the old epoch, then we're told that it's bad
{error, bad_epoch} = Append(hd(Ps), EpochID1),
%% If we use the current epoch again, then it's OK and given same File2
{ok, {_,_,File2}} = Append(hd(Ps), EpochID2),
%% Stop all but 'a'. %% Stop all but 'a'.
[ok = machi_flu_psup:stop_flu_package(Name) || {Name,_} <- tl(Ps)], [ok = machi_flu_psup:stop_flu_package(Name) || {Name,_} <- tl(Ps)],
@ -156,6 +164,9 @@ partial_stop_restart2() ->
hd(ChMgrs)), hd(ChMgrs)),
true = (Epoch_n > Epoch_m), true = (Epoch_n > Epoch_m),
{ok, {false, EpochID3}} = WedgeStatus(hd(Ps)), {ok, {false, EpochID3}} = WedgeStatus(hd(Ps)),
%% The file we're assigned should be different with the epoch change.
{ok, {_,_,File3}} = Append(hd(Ps), EpochID3),
true = (File2 /= File3),
%% Confirm that 'a' is *not* wedged %% Confirm that 'a' is *not* wedged
{ok, _} = Append(hd(Ps), EpochID3), {ok, _} = Append(hd(Ps), EpochID3),