WIP: brute-force read-repair

This commit is contained in:
Scott Lystig Fritchie 2015-05-18 23:26:21 +09:00
parent 185c670b2f
commit f7274e7106
4 changed files with 178 additions and 44 deletions

View file

@ -38,7 +38,7 @@
%% File API
append_chunk/3, append_chunk/4,
append_chunk_extra/4, append_chunk_extra/5,
%% read_chunk/5, read_chunk/6,
read_chunk/4, read_chunk/5,
%% checksum_list/3, checksum_list/4,
%% list_files/2, list_files/3,
%% wedge_status/1, wedge_status/2,
@ -66,7 +66,7 @@
-define(FLU_PC, machi_proxy_flu1_client).
-define(TIMEOUT, 2*1000).
-define(MAX_RUNTIME, 5*1000).
-define(MAX_RUNTIME, 8*1000).
-record(state, {
members_dict :: p_srvr_dict(),
@ -112,14 +112,14 @@ append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra, Timeout) ->
%% %% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
%% read_chunk(PidSpec, EpochID, File, Offset, Size) ->
%% read_chunk(PidSpec, EpochID, File, Offset, Size, infinity).
read_chunk(PidSpec, File, Offset, Size) ->
read_chunk(PidSpec, File, Offset, Size, infinity).
%% %% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
%% read_chunk(PidSpec, EpochID, File, Offset, Size, Timeout) ->
%% gen_server:call(PidSpec, {req, {read_chunk, EpochID, File, Offset, Size}},
%% Timeout).
read_chunk(PidSpec, File, Offset, Size, Timeout) ->
gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size}},
Timeout).
%% %% @doc Fetch the list of chunk checksums for `File'.
@ -268,6 +268,7 @@ handle_cast(_Msg, S) ->
{noreply, S}.
handle_info(_Info, S) ->
io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]),
{noreply, S}.
terminate(_Reason, #state{proxies_dict=ProxiesDict}=_S) ->
@ -280,12 +281,14 @@ code_change(_OldVsn, S, _Extra) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%
handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra}, _From, S) ->
do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), S).
do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), S);
handle_call2({read_chunk, File, Offset, Size}, _From, S) ->
do_read_chunk(File, Offset, Size, 0, os:timestamp(), S).
do_append_head(Prefix, Chunk, ChunkExtra, 0=Depth, STime, S) ->
do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1, STime, S);
do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, #state{proj=P}=S) ->
io:format(user, "append sleep1,", []),
%% io:format(user, "head sleep1,", []),
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > ?MAX_RUNTIME ->
@ -300,10 +303,10 @@ do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, #state{proj=P}=S) ->
P2 when P2 == undefined orelse
P2#projection_v1.upi == [] ->
do_append_head(Prefix, Chunk, ChunkExtra, Depth + 1,
STime, S);
STime, S2);
_ ->
do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1,
STime, S)
STime, S2)
end
end.
@ -311,13 +314,11 @@ do_append_head2(Prefix, Chunk, ChunkExtra, Depth, STime,
#state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) ->
[HeadFLU|RestFLUs] = mutation_flus(P),
Proxy = orddict:fetch(HeadFLU, PD),
io:format(user, "append ~w,", [Proxy]),
case ?FLU_PC:append_chunk_extra(Proxy,
EpochID, Prefix, Chunk, ChunkExtra,
?TIMEOUT) of
{ok, {Offset, _Size, File}=_X} ->
%% io:format(user, "append ~w@~p,~w,", [HeadFLU, File, Offset]),
io:format(user, "append ~w,", [HeadFLU]),
%% io:format(user, "append ~w,", [HeadFLU]),
do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
[HeadFLU], 0, STime, S);
{error, Retry}
@ -338,9 +339,9 @@ do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
when RestFLUs == [] orelse Depth == 0 ->
do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
Ws, Depth + 1, STime, S);
do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
Ws, Depth, STime, #state{proj=P}=S) ->
io:format(user, "append sleep2,", []),
%% io:format(user, "midtail sleep2,", []),
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > ?MAX_RUNTIME ->
@ -353,7 +354,7 @@ do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
P2 ->
RestFLUs2 = mutation_flus(P2),
case RestFLUs2 -- Ws of
RestFLUs ->
RestFLUs2 ->
%% None of the writes that we have done so far
%% are to FLUs that are in the RestFLUs2 list.
%% We are pessimistic here and assume that
@ -362,28 +363,26 @@ do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
%% the 2nd have of the impl (we have already
%% slept & refreshed the projection).
do_append_head2(Prefix, Chunk, ChunkExtra, Depth,
STime, S);
STime, S2);
RestFLUs3 ->
do_append_midtail2(RestFLUs3, Prefix, File, Offset,
Chunk, ChunkExtra,
Ws, Depth + 1, STime, S)
Ws, Depth + 1, STime, S2)
end
end
end.
do_append_midtail2([], _Prefix, File, Offset, Chunk,
_ChunkExtra, _Ws, _Depth, _STime, S) ->
io:format(user, "ok!\n", []),
%% io:format(user, "ok!\n", []),
{reply, {ok, {Offset, iolist_size(Chunk), File}}, S};
do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk,
ChunkExtra, Ws, Depth, STime,
#state{epoch_id=EpochID, proxies_dict=PD}=S) ->
Proxy = orddict:fetch(FLU, PD),
io:format(user, "write ~w,", [Proxy]),
case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of
ok ->
%% io:format(user, "write ~w@~p~w,", [FLU, File, Offset]),
io:format(user, "write ~w,", [FLU]),
%% io:format(user, "write ~w,", [FLU]),
do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk,
ChunkExtra, [FLU|Ws], Depth, STime, S);
{error, Retry}
@ -392,25 +391,149 @@ do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk,
ChunkExtra, Ws, Depth, STime, S);
{error, written} ->
exit({todo,read_repair,?MODULE,?LINE,File,Offset,iolist_size(Chunk)})
%% read_repair(P#projection_v1.upi ++ P#projection_v1.repairing,
%% Chunk, Depth, STime, S)
%% Chunk1 = if is_binary(Chunk) -> Chunk;
%% is_list(Chunk) -> list_to_binary(Chunk)
%% end,
%% case ?FLU_PC:read_chunk(Proxy, EpochID, File, Offset,
%% size(Chunk1), ?TIMEOUT) of
%% {ok, Chunk2} when Chunk2 == Chunk1 ->
%% %% Someone has been read-repairing this chunk.
%% %% Keep going.
%% do_append_chunk_extra2(RestFLUs, File, Offset, Chunk,
%% OldHeadFLU, OkCount + 1, S);
%% {error, not_written} ->
%% exit({todo_should_never_happen,?MODULE,?LINE,
%% File,Offset, size(Chunk1)});
%% { ->
%% TODO return values here
end.
do_read_chunk(File, Offset, Size, 0=Depth, STime,
#state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty
do_read_chunk2(File, Offset, Size, Depth, STime, S);
do_read_chunk(File, Offset, Size, Depth, STime, #state{proj=P}=S) ->
%% io:format(user, "read sleep1,", []),
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > ?MAX_RUNTIME ->
{error, partition};
true ->
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
case S2#state.proj of
P2 when P2 == undefined orelse
P2#projection_v1.upi == [] ->
do_read_chunk(File, Offset, Size, Depth + 1, STime, S2);
_ ->
do_read_chunk2(File, Offset, Size, Depth + 1, STime, S2)
end
end.
do_read_chunk2(File, Offset, Size, Depth, STime,
#state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) ->
UPI = readonly_flus(P),
Head = hd(UPI),
Tail = lists:last(UPI),
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
File, Offset, Size, ?TIMEOUT) of
{ok, Chunk} when byte_size(Chunk) == Size ->
{{ok, Chunk}, S};
{ok, BadChunk} ->
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size,
got, byte_size(BadChunk)});
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
do_read_chunk(File, Offset, Size, Depth, STime, S);
{error, not_written} when Tail == Head ->
{{error, not_written}, S};
{error, not_written} when Tail /= Head ->
read_repair(read, File, Offset, Size, Depth, STime, S);
{error, written} ->
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size})
end.
read_repair(ReturnMode, File, Offset, Size, 0=Depth, STime,
#state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty
read_repair2(ReturnMode, File, Offset, Size, Depth, STime, S);
read_repair(ReturnMode, File, Offset, Size, Depth, STime, #state{proj=P}=S) ->
%% io:format(user, "read_repair sleep1,", []),
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > ?MAX_RUNTIME ->
{error, partition};
true ->
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
case S2#state.proj of
P2 when P2 == undefined orelse
P2#projection_v1.upi == [] ->
read_repair(ReturnMode, File, Offset, Size,
Depth + 1, STime, S2);
_ ->
read_repair2(ReturnMode, File, Offset, Size,
Depth + 1, STime, S2)
end
end.
read_repair2(ReturnMode, File, Offset, Size, Depth, STime,
#state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) ->
[Head|MidsTails] = readonly_flus(P),
case ?FLU_PC:read_chunk(orddict:fetch(Head, PD), EpochID,
File, Offset, Size, ?TIMEOUT) of
{ok, Chunk} when byte_size(Chunk) == Size ->
read_repair3(MidsTails, ReturnMode, Chunk, [Head], File, Offset,
Size, Depth, STime, S);
{ok, BadChunk} ->
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size,
got, byte_size(BadChunk)});
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
read_repair(ReturnMode, File, Offset, Size, Depth, STime, S);
{error, not_written} ->
{{error, not_written}, S};
{error, written} ->
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size})
end.
read_repair3([], ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, S) ->
read_repair4([], ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, S);
read_repair3(MidsTails, ReturnMode, Chunk, Repaired, File, Offset,
Size, 0=Depth, STime, S) ->
read_repair4(MidsTails, ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, S);
read_repair3(MidsTails, ReturnMode, Chunk, File, Repaired, Offset,
Size, Depth, STime, #state{proj=P}=S) ->
%% io:format(user, "read_repair3 sleep1,", []),
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > ?MAX_RUNTIME ->
{error, partition};
true ->
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
case S2#state.proj of
P2 when P2 == undefined orelse
P2#projection_v1.upi == [] ->
read_repair3(MidsTails, ReturnMode, Chunk, Repaired, File,
Offset, Size, Depth + 1, STime, S2);
P2 ->
MidsTails2 = P2#projection_v1.upi -- Repaired,
read_repair4(MidsTails2, ReturnMode, Chunk, Repaired, File,
Offset, Size, Depth + 1, STime, S2)
end
end.
read_repair4([], ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, S) ->
case ReturnMode of
read ->
{reply, {ok, Chunk}, S}
end;
read_repair4([First|Rest]=MidsTails, ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, #state{epoch_id=EpochID, proxies_dict=PD}=S) ->
Proxy = orddict:fetch(First, PD),
case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of
ok ->
read_repair4(Rest, ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, S);
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
read_repair3(MidsTails, ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, S);
{error, written} ->
%% TODO: To be very paranoid, read the chunk here to verify
%% that it is exactly our Chunk.
read_repair4(Rest, ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, S);
{error, not_written} ->
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size})
end.
update_proj(#state{proj=undefined}=S) ->
update_proj2(1, S);
update_proj(S) ->
@ -444,7 +567,6 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) ->
P when P >= BadProj ->
#projection_v1{epoch_number=Epoch, epoch_csum=CSum,
members_dict=NewMembersDict} = P,
io:format(user, "~s: proj ~P\n", [?MODULE, P, 15]),
EpochID = {Epoch, CSum},
?FLU_PC:stop_proxies(ProxiesDict),
NewProxiesDict = ?FLU_PC:start_proxies(NewMembersDict),

View file

@ -292,8 +292,17 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
OffsetHex:16/binary, LenHex:8/binary,
File:WriteFileLenLF/binary, "\n">> ->
_EpochID = decode_epoch_id(EpochIDHex),
%% do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir,
%% <<"fixme1">>, false, <<"fixme2">>);
if FluName == a ->
do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir,
<<"fixme1">>, false, <<"fixme2">>);
true ->
ok = inet:setopts(Sock, [{packet, raw}]),
Len = machi_util:hexstr_to_int(LenHex),
{ok, Chunk} = gen_tcp:recv(Sock, Len),
ok = gen_tcp:send(Sock, <<"OK\n">>)
end;
%% For data migration only.
<<"DEL-migration ",
EpochIDHex:(?EpochIDSpace)/binary,
@ -432,6 +441,7 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
<<Len:32/big>> = machi_util:hexstr_to_bin(LenHex),
{_, Path} = machi_util:make_data_filename(DataDir, FileBin),
OptsHasWrite = lists:member(write, FileOpts),
OptsHasRead = lists:member(read, FileOpts),
case file:open(Path, FileOpts) of
{ok, FH} ->
try
@ -444,8 +454,9 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun,
EpochID, Wedged_p, CurrentEpochId);
{error, enoent} when OptsHasRead ->
ok = gen_tcp:send(Sock, <<"ERROR NO-SUCH-FILE\n">>);
_Else ->
%%%%%% keep?? machi_util:verb("Else ~p ~p ~p ~p\n", [Offset, Len, Path, _Else]),
ok = gen_tcp:send(Sock, <<"ERROR BAD-IO\n">>)
end.

View file

@ -515,8 +515,8 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) ->
{error, todo_erasure_coded}; %% escript_cc_parse_ec_info(Sock, Line, Else2);
<<"ERR">> ->
case Else2 of
<<"OR BAD-IO\n">> ->
{error, no_such_file};
<<"OR NO-SUCH-FILE\n">> ->
{error, not_written};
<<"OR NOT-ERASURE\n">> ->
{error, no_such_file};
<<"OR BAD-ARG\n">> ->

View file

@ -279,6 +279,7 @@ handle_cast(_Msg, S) ->
{noreply, S}.
handle_info(_Info, S) ->
io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]),
{noreply, S}.
terminate(_Reason, _S) ->