WIP: narrowing in on repair problems due to double-write errors 2
This commit is contained in:
parent
a7f42d636e
commit
3bd575899f
5 changed files with 45 additions and 16 deletions
|
@ -1909,7 +1909,7 @@ react_to_env_C100_inner(Author_latest, NotSanesDict0, _MyName,
|
|||
S2 = S#ch_mgr{not_sanes=NotSanesDict, sane_transitions=0},
|
||||
case orddict:fetch(Author_latest, NotSanesDict) of
|
||||
N when N > ?TOO_FREQUENT_BREAKER ->
|
||||
%% ?V("\n\nYOYO ~w breaking the cycle of:\n current: ~w\n new : ~w\n", [_MyName, machi_projection:make_summary(S#ch_mgr.proj), machi_projection:make_summary(P_latest)]),
|
||||
?V("\n\nYOYO ~w breaking the cycle insane-freq=~w by-author=~w of:\n current: ~w\n new : ~w\n", [_MyName, N, Author_latest, machi_projection:make_summary(S#ch_mgr.proj), machi_projection:make_summary(P_latest)]),
|
||||
?REACT({c100, ?LINE, [{not_sanes_author_count, N}]}),
|
||||
react_to_env_C103(P_newprop, P_latest, P_current_calc, S2);
|
||||
N ->
|
||||
|
@ -1937,7 +1937,8 @@ react_to_env_C103(#projection_v1{epoch_number=_Epoch_newprop} = _P_newprop,
|
|||
?REACT({c103, ?LINE,
|
||||
[{current_epoch, P_current#projection_v1.epoch_number},
|
||||
{none_projection_epoch, P_none#projection_v1.epoch_number}]}),
|
||||
io:format(user, "SET add_admin_down(~w) at ~w =====================================\n", [MyName, time()]),
|
||||
io:format(user, "SET add_admin_down(~w) at ~w TODO current_epoch ~w none_proj_epoch ~w =====================================\n", [MyName, time(), P_current#projection_v1.epoch_number, P_none#projection_v1.epoch_number]),
|
||||
%% io:format(user, "SET add_admin_down(~w) at ~w =====================================\n", [MyName, time()]),
|
||||
machi_fitness:add_admin_down(S#ch_mgr.fitness_svr, MyName, []),
|
||||
timer:sleep(5*1000),
|
||||
io:format(user, "SET delete_admin_down(~w) at ~w =====================================\n", [MyName, time()]),
|
||||
|
@ -2985,9 +2986,9 @@ perhaps_verbose_c111(P_latest2, S) ->
|
|||
true ->
|
||||
ok
|
||||
end,
|
||||
%% TODO put me back: case proplists:get_value(private_write_verbose,
|
||||
%% S#ch_mgr.opts) of
|
||||
case true of
|
||||
case proplists:get_value(private_write_verbose,
|
||||
S#ch_mgr.opts) of
|
||||
%% case true of
|
||||
true when Summ2 /= Last2 ->
|
||||
put(last_verbose, Summ2),
|
||||
?V("\n~s ~p uses plain: ~w \n",
|
||||
|
|
|
@ -299,16 +299,20 @@ do_append_head3(NSInfo, Prefix,
|
|||
case ?FLU_PC:append_chunk(Proxy, NSInfo, EpochID,
|
||||
Prefix, Chunk, CSum, Opts, ?TIMEOUT) of
|
||||
{ok, {Offset, _Size, File}=_X} ->
|
||||
io:format(user, "CLNT append_chunk: head ~w ok\n ~p\n hd ~p rest ~p epoch ~P\n", [HeadFLU, _X, HeadFLU, RestFLUs, EpochID, 8]),
|
||||
do_wr_app_midtail(RestFLUs, NSInfo, Prefix,
|
||||
File, Offset, Chunk, CSum, Opts,
|
||||
[HeadFLU], 0, STime, TO, append, S);
|
||||
{error, bad_checksum}=BadCS ->
|
||||
io:format(user, "CLNT append_chunk: head ~w BAD CS\n", [HeadFLU]),
|
||||
{reply, BadCS, S};
|
||||
{error, Retry}
|
||||
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
||||
io:format(user, "CLNT append_chunk: head ~w error ~p\n", [HeadFLU, Retry]),
|
||||
do_append_head(NSInfo, Prefix,
|
||||
Chunk, CSum, Opts, Depth, STime, TO, S);
|
||||
{error, written} ->
|
||||
io:format(user, "CLNT append_chunk: head ~w Written\n", [HeadFLU]),
|
||||
%% Implicit sequencing + this error = we don't know where this
|
||||
%% written block is. But we lost a race. Repeat, with a new
|
||||
%% sequencer assignment.
|
||||
|
@ -387,26 +391,32 @@ do_wr_app_midtail2([FLU|RestFLUs]=FLUs, NSInfo,
|
|||
CSum, Opts, Ws, Depth, STime, TO, MyOp,
|
||||
#state{epoch_id=EpochID, proxies_dict=PD}=S) ->
|
||||
Proxy = orddict:fetch(FLU, PD),
|
||||
io:format(user, "CLNT append_chunk: mid/tail ~w\n", [FLU]),
|
||||
case ?FLU_PC:write_chunk(Proxy, NSInfo, EpochID, File, Offset, Chunk, CSum, ?TIMEOUT) of
|
||||
ok ->
|
||||
io:format(user, "CLNT append_chunk: mid/tail ~w ok\n", [FLU]),
|
||||
do_wr_app_midtail2(RestFLUs, NSInfo, Prefix,
|
||||
File, Offset, Chunk,
|
||||
CSum, Opts, [FLU|Ws], Depth, STime, TO, MyOp, S);
|
||||
{error, bad_checksum}=BadCS ->
|
||||
io:format(user, "CLNT append_chunk: mid/tail ~w BAD CS\n", [FLU]),
|
||||
%% TODO: alternate strategy?
|
||||
{reply, BadCS, S};
|
||||
{error, Retry}
|
||||
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
||||
io:format(user, "CLNT append_chunk: mid/tail ~w error ~p\n", [FLU, Retry]),
|
||||
do_wr_app_midtail(FLUs, NSInfo, Prefix,
|
||||
File, Offset, Chunk,
|
||||
CSum, Opts, Ws, Depth, STime, TO, MyOp, S);
|
||||
{error, written} ->
|
||||
io:format(user, "CLNT append_chunk: mid/tail ~w WRITTEN\n", [FLU]),
|
||||
%% We know what the chunk ought to be, so jump to the
|
||||
%% middle of read-repair.
|
||||
Resume = {append, Offset, iolist_size(Chunk), File},
|
||||
do_repair_chunk(FLUs, Resume, Chunk, CSum, [], NSInfo, File, Offset,
|
||||
iolist_size(Chunk), Depth, STime, S);
|
||||
{error, trimmed} = Err ->
|
||||
io:format(user, "CLNT append_chunk: mid/tail ~w TRIMMED\n", [FLU]),
|
||||
%% TODO: nothing can be done
|
||||
{reply, Err, S};
|
||||
{error, not_written} ->
|
||||
|
@ -933,6 +943,7 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict,
|
|||
NewProxiesDict = ?FLU_PC:start_proxies(NewMembersDict),
|
||||
%% Make crash reports shorter by getting rid of 'react' history.
|
||||
P2 = P#projection_v1{dbg2=lists:keydelete(react, 1, Dbg2)},
|
||||
io:format(user, "CLNT PROJ: epoch ~p ~P upi ~w ~w\n", [P2#projection_v1.epoch_number, P2#projection_v1.epoch_csum, 6, P2#projection_v1.upi, P2#projection_v1.repairing]),
|
||||
S#state{bad_proj=undefined, proj=P2, epoch_id=EpochID,
|
||||
members_dict=NewMembersDict, proxies_dict=NewProxiesDict};
|
||||
_P ->
|
||||
|
|
|
@ -120,6 +120,7 @@ handle_call(Else, From, S) ->
|
|||
handle_cast({wedge_myself, WedgeEpochId},
|
||||
#state{flu_name=FluName, wedged=Wedged_p, epoch_id=OldEpochId}=S) ->
|
||||
if not Wedged_p andalso WedgeEpochId == OldEpochId ->
|
||||
io:format(user, "FLU WEDGE 2: ~w : ~w ~P\n", [S#state.flu_name, true, OldEpochId, 6]),
|
||||
true = ets:insert(S#state.etstab,
|
||||
{epoch, {true, OldEpochId}}),
|
||||
%% Tell my chain manager that it might want to react to
|
||||
|
@ -138,6 +139,7 @@ handle_cast({wedge_state_change, Boolean, {NewEpoch, _}=NewEpochId},
|
|||
undefined -> -1
|
||||
end,
|
||||
if NewEpoch >= OldEpoch ->
|
||||
io:format(user, "FLU WEDGE 1: ~w : ~w ~P\n", [S#state.flu_name, Boolean, NewEpochId, 6]),
|
||||
true = ets:insert(S#state.etstab,
|
||||
{epoch, {Boolean, NewEpochId}}),
|
||||
{noreply, S#state{wedged=Boolean, epoch_id=NewEpochId}};
|
||||
|
@ -177,8 +179,13 @@ handle_append(NSInfo,
|
|||
Prefix, Chunk, TCSum, Opts, FluName, EpochId) ->
|
||||
Res = machi_flu_filename_mgr:find_or_make_filename_from_prefix(
|
||||
FluName, EpochId, {prefix, Prefix}, NSInfo),
|
||||
io:format(user, "FLU NAME: ~w + ~p got ~p\n", [FluName, Prefix, Res]),
|
||||
case Res of
|
||||
{file, F} ->
|
||||
case re:run(F, atom_to_list(FluName) ++ ",") of
|
||||
nomatch ->
|
||||
io:format(user, "\n\n\t\tBAAAAAAA\n\n", []), timer:sleep(50), erlang:halt(0);
|
||||
_ -> ok end,
|
||||
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, F}) of
|
||||
{ok, Pid} ->
|
||||
{Tag, CS} = machi_util:unmake_tagged_csum(TCSum),
|
||||
|
|
|
@ -101,7 +101,7 @@ find_or_make_filename_from_prefix(FluName, EpochId,
|
|||
#ns_info{}=NSInfo)
|
||||
when is_atom(FluName) ->
|
||||
N = make_filename_mgr_name(FluName),
|
||||
gen_server:call(N, {find_filename, EpochId, NSInfo, Prefix}, ?TIMEOUT);
|
||||
gen_server:call(N, {find_filename, FluName, EpochId, NSInfo, Prefix}, ?TIMEOUT);
|
||||
find_or_make_filename_from_prefix(_FluName, _EpochId, Other, Other2) ->
|
||||
lager:error("~p is not a valid prefix/locator ~p", [Other, Other2]),
|
||||
error(badarg).
|
||||
|
@ -143,18 +143,19 @@ handle_cast(Req, State) ->
|
|||
%% the FLU has already validated that the caller's epoch id and the FLU's epoch id
|
||||
%% are the same. So we *assume* that remains the case here - that is to say, we
|
||||
%% are not wedged.
|
||||
handle_call({find_filename, EpochId, NSInfo, Prefix}, _From, S = #state{ datadir = DataDir,
|
||||
epoch = EpochId,
|
||||
tid = Tid }) ->
|
||||
handle_call({find_filename, FluName, EpochId, NSInfo, Prefix}, _From,
|
||||
S = #state{ datadir = DataDir, epoch = EpochId, tid = Tid }) ->
|
||||
%% Our state and the caller's epoch ids are the same. Business as usual.
|
||||
File = handle_find_file(Tid, NSInfo, Prefix, DataDir),
|
||||
io:format(user, "FMGR ~w LINE ~p\n", [FluName, ?LINE]),
|
||||
File = handle_find_file(FluName, Tid, NSInfo, Prefix, DataDir),
|
||||
{reply, {file, File}, S};
|
||||
|
||||
handle_call({find_filename, EpochId, NSInfo, Prefix}, _From, S = #state{ datadir = DataDir, tid = Tid }) ->
|
||||
handle_call({find_filename, FluName, EpochId, NSInfo, Prefix}, _From, S = #state{ datadir = DataDir, tid = Tid }) ->
|
||||
%% If the epoch id in our state and the caller's epoch id were the same, it would've
|
||||
%% matched the above clause. Since we're here, we know that they are different.
|
||||
%% If epoch ids between our state and the caller's are different, we must increment the
|
||||
%% sequence number, generate a filename and then cache it.
|
||||
io:format(user, "FMGR ~w LINE ~p\n", [FluName, ?LINE]),
|
||||
File = increment_and_cache_filename(Tid, DataDir, NSInfo, Prefix),
|
||||
{reply, {file, File}, S#state{epoch = EpochId}};
|
||||
|
||||
|
@ -205,13 +206,15 @@ list_files(DataDir, Prefix) ->
|
|||
make_filename_mgr_name(FluName) when is_atom(FluName) ->
|
||||
list_to_atom(atom_to_list(FluName) ++ "_filename_mgr").
|
||||
|
||||
handle_find_file(Tid, #ns_info{name=NS, locator=NSLocator}=NSInfo, Prefix, DataDir) ->
|
||||
handle_find_file(FluName, Tid, #ns_info{name=NS, locator=NSLocator}=NSInfo, Prefix, DataDir) ->
|
||||
N = machi_util:read_max_filenum(DataDir, NS, NSLocator, Prefix),
|
||||
{File, Cleanup} = case find_file(DataDir, NSInfo, Prefix, N) of
|
||||
[] ->
|
||||
io:format(user, "HFF: 1\n", []),
|
||||
{find_or_make_filename(Tid, DataDir, NS, NSLocator, Prefix, N), false};
|
||||
[H] -> {H, true};
|
||||
[H] -> io:format(user, "HFF: 2 ~s\n", [H]),{H, true};
|
||||
[Fn | _ ] = L ->
|
||||
io:format(user, "HFF: 3 ~p\n", [L]),
|
||||
lager:debug(
|
||||
"Searching for a matching file to prefix ~p and sequence number ~p gave multiples: ~p",
|
||||
[Prefix, N, L]),
|
||||
|
@ -232,7 +235,11 @@ find_or_make_filename(Tid, DataDir, NS, NSLocator, Prefix, N) ->
|
|||
|
||||
generate_filename(DataDir, NS, NSLocator, Prefix, N) ->
|
||||
{A,B,C} = erlang:now(),
|
||||
TODO = lists:flatten(filename:basename(DataDir) ++ "," ++ io_lib:format("~w,~w,~w", [A,B,C])),
|
||||
RN = case process_info(self(), registered_name) of
|
||||
[] -> [];
|
||||
{_,X} -> re:replace(atom_to_list(X), "_.*", "", [{return, binary}])
|
||||
end,
|
||||
TODO = lists:flatten([RN, ",", io_lib:format("~w,~w,~w", [A,B,C])]),
|
||||
{F, _} = machi_util:make_data_filename(
|
||||
DataDir,
|
||||
NS, NSLocator, Prefix,
|
||||
|
|
|
@ -196,12 +196,15 @@ change_partition(Partition,
|
|||
%% Don't wait for stable chain, tick will be executed on demand
|
||||
%% in append oprations
|
||||
_ = tick(S),
|
||||
|
||||
ok.
|
||||
|
||||
%% Generators
|
||||
|
||||
num() ->
|
||||
choose(2, 5).
|
||||
2.
|
||||
%% TODO:put me back
|
||||
%% choose(2, 5).
|
||||
|
||||
cr_count(Num) ->
|
||||
Num * 3.
|
||||
|
|
Loading…
Reference in a new issue