diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index bc3d1e6..be4aacb 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -38,7 +38,7 @@ %% File API append_chunk/3, append_chunk/4, append_chunk_extra/4, append_chunk_extra/5, -%% read_chunk/5, read_chunk/6, + read_chunk/4, read_chunk/5, %% checksum_list/3, checksum_list/4, %% list_files/2, list_files/3, %% wedge_status/1, wedge_status/2, @@ -66,7 +66,7 @@ -define(FLU_PC, machi_proxy_flu1_client). -define(TIMEOUT, 2*1000). --define(MAX_RUNTIME, 5*1000). +-define(MAX_RUNTIME, 8*1000). -record(state, { members_dict :: p_srvr_dict(), @@ -112,14 +112,14 @@ append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra, Timeout) -> %% %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. -%% read_chunk(PidSpec, EpochID, File, Offset, Size) -> -%% read_chunk(PidSpec, EpochID, File, Offset, Size, infinity). +read_chunk(PidSpec, File, Offset, Size) -> + read_chunk(PidSpec, File, Offset, Size, infinity). -%% %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. +%% @doc Read a chunk of data of size `Size' from `File' at `Offset'. -%% read_chunk(PidSpec, EpochID, File, Offset, Size, Timeout) -> -%% gen_server:call(PidSpec, {req, {read_chunk, EpochID, File, Offset, Size}}, -%% Timeout). +read_chunk(PidSpec, File, Offset, Size, Timeout) -> + gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size}}, + Timeout). %% %% @doc Fetch the list of chunk checksums for `File'. @@ -268,6 +268,7 @@ handle_cast(_Msg, S) -> {noreply, S}. handle_info(_Info, S) -> + io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]), {noreply, S}. terminate(_Reason, #state{proxies_dict=ProxiesDict}=_S) -> @@ -280,12 +281,14 @@ code_change(_OldVsn, S, _Extra) -> %%%%%%%%%%%%%%%%%%%%%%%%%%% handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra}, _From, S) -> - do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), S). + do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), S); +handle_call2({read_chunk, File, Offset, Size}, _From, S) -> + do_read_chunk(File, Offset, Size, 0, os:timestamp(), S). do_append_head(Prefix, Chunk, ChunkExtra, 0=Depth, STime, S) -> do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1, STime, S); do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, #state{proj=P}=S) -> - io:format(user, "append sleep1,", []), + %% io:format(user, "head sleep1,", []), sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> @@ -300,10 +303,10 @@ do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, #state{proj=P}=S) -> P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> do_append_head(Prefix, Chunk, ChunkExtra, Depth + 1, - STime, S); + STime, S2); _ -> do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1, - STime, S) + STime, S2) end end. @@ -311,13 +314,11 @@ do_append_head2(Prefix, Chunk, ChunkExtra, Depth, STime, #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> [HeadFLU|RestFLUs] = mutation_flus(P), Proxy = orddict:fetch(HeadFLU, PD), - io:format(user, "append ~w,", [Proxy]), case ?FLU_PC:append_chunk_extra(Proxy, EpochID, Prefix, Chunk, ChunkExtra, ?TIMEOUT) of {ok, {Offset, _Size, File}=_X} -> - %% io:format(user, "append ~w@~p,~w,", [HeadFLU, File, Offset]), - io:format(user, "append ~w,", [HeadFLU]), + %% io:format(user, "append ~w,", [HeadFLU]), do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, [HeadFLU], 0, STime, S); {error, Retry} @@ -338,9 +339,9 @@ do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, when RestFLUs == [] orelse Depth == 0 -> do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, Ws, Depth + 1, STime, S); -do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, +do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, Ws, Depth, STime, #state{proj=P}=S) -> - io:format(user, "append sleep2,", []), + %% io:format(user, "midtail sleep2,", []), sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> @@ -353,7 +354,7 @@ do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, P2 -> RestFLUs2 = mutation_flus(P2), case RestFLUs2 -- Ws of - RestFLUs -> + RestFLUs2 -> %% None of the writes that we have done so far %% are to FLUs that are in the RestFLUs2 list. %% We are pessimistic here and assume that @@ -362,28 +363,26 @@ do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, %% the 2nd have of the impl (we have already %% slept & refreshed the projection). do_append_head2(Prefix, Chunk, ChunkExtra, Depth, - STime, S); + STime, S2); RestFLUs3 -> do_append_midtail2(RestFLUs3, Prefix, File, Offset, Chunk, ChunkExtra, - Ws, Depth + 1, STime, S) + Ws, Depth + 1, STime, S2) end end end. do_append_midtail2([], _Prefix, File, Offset, Chunk, _ChunkExtra, _Ws, _Depth, _STime, S) -> - io:format(user, "ok!\n", []), + %% io:format(user, "ok!\n", []), {reply, {ok, {Offset, iolist_size(Chunk), File}}, S}; do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk, ChunkExtra, Ws, Depth, STime, #state{epoch_id=EpochID, proxies_dict=PD}=S) -> Proxy = orddict:fetch(FLU, PD), - io:format(user, "write ~w,", [Proxy]), case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of ok -> - %% io:format(user, "write ~w@~p~w,", [FLU, File, Offset]), - io:format(user, "write ~w,", [FLU]), + %% io:format(user, "write ~w,", [FLU]), do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, [FLU|Ws], Depth, STime, S); {error, Retry} @@ -392,25 +391,149 @@ do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk, ChunkExtra, Ws, Depth, STime, S); {error, written} -> exit({todo,read_repair,?MODULE,?LINE,File,Offset,iolist_size(Chunk)}) - %% read_repair(P#projection_v1.upi ++ P#projection_v1.repairing, - %% Chunk, Depth, STime, S) - %% Chunk1 = if is_binary(Chunk) -> Chunk; - %% is_list(Chunk) -> list_to_binary(Chunk) - %% end, - %% case ?FLU_PC:read_chunk(Proxy, EpochID, File, Offset, - %% size(Chunk1), ?TIMEOUT) of - %% {ok, Chunk2} when Chunk2 == Chunk1 -> - %% %% Someone has been read-repairing this chunk. - %% %% Keep going. - %% do_append_chunk_extra2(RestFLUs, File, Offset, Chunk, - %% OldHeadFLU, OkCount + 1, S); - %% {error, not_written} -> - %% exit({todo_should_never_happen,?MODULE,?LINE, - %% File,Offset, size(Chunk1)}); - %% { -> %% TODO return values here end. +do_read_chunk(File, Offset, Size, 0=Depth, STime, + #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty + do_read_chunk2(File, Offset, Size, Depth, STime, S); +do_read_chunk(File, Offset, Size, Depth, STime, #state{proj=P}=S) -> + %% io:format(user, "read sleep1,", []), + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > ?MAX_RUNTIME -> + {error, partition}; + true -> + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + case S2#state.proj of + P2 when P2 == undefined orelse + P2#projection_v1.upi == [] -> + do_read_chunk(File, Offset, Size, Depth + 1, STime, S2); + _ -> + do_read_chunk2(File, Offset, Size, Depth + 1, STime, S2) + end + end. + +do_read_chunk2(File, Offset, Size, Depth, STime, + #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> + UPI = readonly_flus(P), + Head = hd(UPI), + Tail = lists:last(UPI), + case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, + File, Offset, Size, ?TIMEOUT) of + {ok, Chunk} when byte_size(Chunk) == Size -> + {{ok, Chunk}, S}; + {ok, BadChunk} -> + exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, + got, byte_size(BadChunk)}); + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + do_read_chunk(File, Offset, Size, Depth, STime, S); + {error, not_written} when Tail == Head -> + {{error, not_written}, S}; + {error, not_written} when Tail /= Head -> + read_repair(read, File, Offset, Size, Depth, STime, S); + {error, written} -> + exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) + end. + +read_repair(ReturnMode, File, Offset, Size, 0=Depth, STime, + #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty + read_repair2(ReturnMode, File, Offset, Size, Depth, STime, S); +read_repair(ReturnMode, File, Offset, Size, Depth, STime, #state{proj=P}=S) -> + %% io:format(user, "read_repair sleep1,", []), + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > ?MAX_RUNTIME -> + {error, partition}; + true -> + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + case S2#state.proj of + P2 when P2 == undefined orelse + P2#projection_v1.upi == [] -> + read_repair(ReturnMode, File, Offset, Size, + Depth + 1, STime, S2); + _ -> + read_repair2(ReturnMode, File, Offset, Size, + Depth + 1, STime, S2) + end + end. + +read_repair2(ReturnMode, File, Offset, Size, Depth, STime, + #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> + [Head|MidsTails] = readonly_flus(P), + case ?FLU_PC:read_chunk(orddict:fetch(Head, PD), EpochID, + File, Offset, Size, ?TIMEOUT) of + {ok, Chunk} when byte_size(Chunk) == Size -> + read_repair3(MidsTails, ReturnMode, Chunk, [Head], File, Offset, + Size, Depth, STime, S); + {ok, BadChunk} -> + exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, + got, byte_size(BadChunk)}); + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + read_repair(ReturnMode, File, Offset, Size, Depth, STime, S); + {error, not_written} -> + {{error, not_written}, S}; + {error, written} -> + exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) + end. + +read_repair3([], ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, S) -> + read_repair4([], ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, S); +read_repair3(MidsTails, ReturnMode, Chunk, Repaired, File, Offset, + Size, 0=Depth, STime, S) -> + read_repair4(MidsTails, ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, S); +read_repair3(MidsTails, ReturnMode, Chunk, File, Repaired, Offset, + Size, Depth, STime, #state{proj=P}=S) -> + %% io:format(user, "read_repair3 sleep1,", []), + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > ?MAX_RUNTIME -> + {error, partition}; + true -> + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + case S2#state.proj of + P2 when P2 == undefined orelse + P2#projection_v1.upi == [] -> + read_repair3(MidsTails, ReturnMode, Chunk, Repaired, File, + Offset, Size, Depth + 1, STime, S2); + P2 -> + MidsTails2 = P2#projection_v1.upi -- Repaired, + read_repair4(MidsTails2, ReturnMode, Chunk, Repaired, File, + Offset, Size, Depth + 1, STime, S2) + end + end. + +read_repair4([], ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, S) -> + case ReturnMode of + read -> + {reply, {ok, Chunk}, S} + end; +read_repair4([First|Rest]=MidsTails, ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, #state{epoch_id=EpochID, proxies_dict=PD}=S) -> + Proxy = orddict:fetch(First, PD), + case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of + ok -> + read_repair4(Rest, ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, S); + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + read_repair3(MidsTails, ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, S); + {error, written} -> + %% TODO: To be very paranoid, read the chunk here to verify + %% that it is exactly our Chunk. + read_repair4(Rest, ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, S); + {error, not_written} -> + exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) + end. + update_proj(#state{proj=undefined}=S) -> update_proj2(1, S); update_proj(S) -> @@ -444,7 +567,6 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) -> P when P >= BadProj -> #projection_v1{epoch_number=Epoch, epoch_csum=CSum, members_dict=NewMembersDict} = P, - io:format(user, "~s: proj ~P\n", [?MODULE, P, 15]), EpochID = {Epoch, CSum}, ?FLU_PC:stop_proxies(ProxiesDict), NewProxiesDict = ?FLU_PC:start_proxies(NewMembersDict), diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index c7bdc8d..31fe56d 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -292,8 +292,17 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> OffsetHex:16/binary, LenHex:8/binary, File:WriteFileLenLF/binary, "\n">> -> _EpochID = decode_epoch_id(EpochIDHex), + %% do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir, + %% <<"fixme1">>, false, <<"fixme2">>); + if FluName == a -> do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir, <<"fixme1">>, false, <<"fixme2">>); + true -> + ok = inet:setopts(Sock, [{packet, raw}]), + Len = machi_util:hexstr_to_int(LenHex), + {ok, Chunk} = gen_tcp:recv(Sock, Len), + ok = gen_tcp:send(Sock, <<"OK\n">>) + end; %% For data migration only. <<"DEL-migration ", EpochIDHex:(?EpochIDSpace)/binary, @@ -432,6 +441,7 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, <> = machi_util:hexstr_to_bin(LenHex), {_, Path} = machi_util:make_data_filename(DataDir, FileBin), OptsHasWrite = lists:member(write, FileOpts), + OptsHasRead = lists:member(read, FileOpts), case file:open(Path, FileOpts) of {ok, FH} -> try @@ -444,8 +454,9 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, Sock, OffsetHex, LenHex, FileBin, DataDir, FileOpts, DoItFun, EpochID, Wedged_p, CurrentEpochId); + {error, enoent} when OptsHasRead -> + ok = gen_tcp:send(Sock, <<"ERROR NO-SUCH-FILE\n">>); _Else -> - %%%%%% keep?? machi_util:verb("Else ~p ~p ~p ~p\n", [Offset, Len, Path, _Else]), ok = gen_tcp:send(Sock, <<"ERROR BAD-IO\n">>) end. diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 611e3d3..002b5f3 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -515,8 +515,8 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) -> {error, todo_erasure_coded}; %% escript_cc_parse_ec_info(Sock, Line, Else2); <<"ERR">> -> case Else2 of - <<"OR BAD-IO\n">> -> - {error, no_such_file}; + <<"OR NO-SUCH-FILE\n">> -> + {error, not_written}; <<"OR NOT-ERASURE\n">> -> {error, no_such_file}; <<"OR BAD-ARG\n">> -> diff --git a/src/machi_proxy_flu1_client.erl b/src/machi_proxy_flu1_client.erl index 79cff36..3d0100e 100644 --- a/src/machi_proxy_flu1_client.erl +++ b/src/machi_proxy_flu1_client.erl @@ -279,6 +279,7 @@ handle_cast(_Msg, S) -> {noreply, S}. handle_info(_Info, S) -> + io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]), {noreply, S}. terminate(_Reason, _S) ->