diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index 594812d..6e48978 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -1930,9 +1930,12 @@ perhaps_start_repair( %% RepairOpts = [{repair_mode, check}, verbose], RepairFun = fun() -> do_repair(S, RepairOpts, ap_mode) end, LastUPI = lists:last(UPI), + IgnoreStabilityTime_p = proplists:get_value(ignore_stability_time, + S#ch_mgr.opts, false), case timer:now_diff(os:timestamp(), Start) div 1000000 of - N when MyName == LastUPI, - N >= ?REPAIR_START_STABILITY_TIME -> + N when MyName == LastUPI andalso + (IgnoreStabilityTime_p orelse + N >= ?REPAIR_START_STABILITY_TIME) -> {WorkerPid, _Ref} = spawn_monitor(RepairFun), S#ch_mgr{repair_worker=WorkerPid, repair_start=os:timestamp(), diff --git a/src/machi_chain_repair.erl b/src/machi_chain_repair.erl index 1ef772f..b890d80 100644 --- a/src/machi_chain_repair.erl +++ b/src/machi_chain_repair.erl @@ -107,8 +107,6 @@ repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) -> catch What:Why -> Stack = erlang:get_stacktrace(), - io:format(user, "What Why ~p ~p @\n\t~p\n", - [What, Why, Stack]), {error, {What, Why, Stack}} after [(catch machi_proxy_flu1_client:quit(Pid)) || diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index 7903816..1299c28 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -261,7 +261,7 @@ handle_call({req, Req}, From, S) -> handle_call(quit, _From, S) -> {stop, normal, ok, S}; handle_call(_Request, _From, S) -> - Reply = ok, + Reply = whaaaaaaaaaaaaaaaaaaaa, {reply, Reply, S}. handle_cast(_Msg, S) -> @@ -292,7 +292,7 @@ do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, #state{proj=P}=S) -> sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> - {error, partition}; + {reply, {error, partition}, S}; true -> %% This is suboptimal for performance: there are some paths %% through this point where our current projection is good @@ -345,12 +345,12 @@ do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> - {error, partition}; + {reply, {error, partition}, S}; true -> S2 = update_proj(S#state{proj=undefined, bad_proj=P}), case S2#state.proj of undefined -> - {error, partition}; + {reply, {error, partition}, S}; P2 -> RestFLUs2 = mutation_flus(P2), case RestFLUs2 -- Ws of @@ -400,13 +400,13 @@ do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk, do_read_chunk(File, Offset, Size, 0=Depth, STime, #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty - do_read_chunk2(File, Offset, Size, Depth, STime, S); + do_read_chunk2(File, Offset, Size, Depth + 1, STime, S); do_read_chunk(File, Offset, Size, Depth, STime, #state{proj=P}=S) -> %% io:format(user, "read sleep1,", []), sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> - {error, partition}; + {reply, {error, partition}, S}; true -> S2 = update_proj(S#state{proj=undefined, bad_proj=P}), case S2#state.proj of @@ -421,8 +421,8 @@ do_read_chunk(File, Offset, Size, Depth, STime, #state{proj=P}=S) -> do_read_chunk2(File, Offset, Size, Depth, STime, #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> UPI = readonly_flus(P), - Head = hd(UPI), Tail = lists:last(UPI), + ConsistencyMode = P#projection_v1.mode, case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, File, Offset, Size, ?TIMEOUT) of {ok, Chunk} when byte_size(Chunk) == Size -> @@ -433,52 +433,84 @@ do_read_chunk2(File, Offset, Size, Depth, STime, {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> do_read_chunk(File, Offset, Size, Depth, STime, S); - {error, not_written} when Tail == Head -> - {{error, not_written}, S}; - {error, not_written} when Tail /= Head -> - read_repair(read, File, Offset, Size, Depth, STime, S); + {error, not_written} -> + read_repair(ConsistencyMode, read, File, Offset, Size, Depth, STime, S); + %% {reply, {error, not_written}, S}; {error, written} -> exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) end. -read_repair(ReturnMode, File, Offset, Size, 0=Depth, STime, - #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty - read_repair2(ReturnMode, File, Offset, Size, Depth, STime, S); -read_repair(ReturnMode, File, Offset, Size, Depth, STime, #state{proj=P}=S) -> - %% io:format(user, "read_repair sleep1,", []), +%% Read repair: depends on the consistency mode that we're in: +%% +%% CP mode: If the head is written, then use it to repair UPI++Repairing. +%% If head is not_written, then do nothing. +%% AP mode: If any FLU in UPI++Repairing is written, then use it to repair +%% UPI+repairing. +%% If all FLUs in UPI++Repairing are not_written, then do nothing. + +read_repair(ConsistencyMode, ReturnMode, File, Offset, Size, 0=Depth, + STime, #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty + read_repair2(ConsistencyMode, ReturnMode, File, Offset, Size, Depth + 1, + STime, S); +read_repair(ConsistencyMode, ReturnMode, File, Offset, Size, Depth, + STime, #state{proj=P}=S) -> sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> - {error, partition}; + {reply, {error, partition}, S}; true -> S2 = update_proj(S#state{proj=undefined, bad_proj=P}), case S2#state.proj of P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> - read_repair(ReturnMode, File, Offset, Size, - Depth + 1, STime, S2); + read_repair(ConsistencyMode, ReturnMode, File, Offset, + Size, Depth + 1, STime, S2); _ -> - read_repair2(ReturnMode, File, Offset, Size, - Depth + 1, STime, S2) + read_repair2(ConsistencyMode, ReturnMode, File, Offset, + Size, Depth + 1, STime, S2) end end. -read_repair2(ReturnMode, File, Offset, Size, Depth, STime, +read_repair2(cp_mode=ConsistencyMode, + ReturnMode, File, Offset, Size, Depth, STime, #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> - [Head|MidsTails] = readonly_flus(P), - case ?FLU_PC:read_chunk(orddict:fetch(Head, PD), EpochID, + Tail = lists:last(readonly_flus(P)), + case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, File, Offset, Size, ?TIMEOUT) of {ok, Chunk} when byte_size(Chunk) == Size -> - read_repair3(MidsTails, ReturnMode, Chunk, [Head], File, Offset, + ToRepair = mutation_flus(P) -- [Tail], + read_repair3(ToRepair, ReturnMode, Chunk, [Tail], File, Offset, Size, Depth, STime, S); {ok, BadChunk} -> - exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, - got, byte_size(BadChunk)}); + exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, + Size, got, byte_size(BadChunk)}); {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> - read_repair(ReturnMode, File, Offset, Size, Depth, STime, S); + read_repair(ConsistencyMode, ReturnMode, File, Offset, + Size, Depth, STime, S); {error, not_written} -> - {{error, not_written}, S}; + {reply, {error, not_written}, S}; + {error, written} -> + exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) + end; +read_repair2(ap_mode=ConsistencyMode, + ReturnMode, File, Offset, Size, Depth, STime, + #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> + Eligible = mutation_flus(P), + case try_to_find_chunk(Eligible, File, Offset, Size, S) of + {ok, Chunk, GotItFrom} when byte_size(Chunk) == Size -> + ToRepair = mutation_flus(P) -- [GotItFrom], + read_repair3(ToRepair, ReturnMode, Chunk, [GotItFrom], File, + Offset, Size, Depth, STime, S); + {ok, BadChunk} -> + exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, + Offset, Size, got, byte_size(BadChunk)}); + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + read_repair(ConsistencyMode, ReturnMode, File, + Offset, Size, Depth, STime, S); + {error, not_written} -> + {reply, {error, not_written}, S}; {error, written} -> exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) end. @@ -487,27 +519,27 @@ read_repair3([], ReturnMode, Chunk, Repaired, File, Offset, Size, Depth, STime, S) -> read_repair4([], ReturnMode, Chunk, Repaired, File, Offset, Size, Depth, STime, S); -read_repair3(MidsTails, ReturnMode, Chunk, Repaired, File, Offset, +read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, Size, 0=Depth, STime, S) -> - read_repair4(MidsTails, ReturnMode, Chunk, Repaired, File, Offset, - Size, Depth, STime, S); -read_repair3(MidsTails, ReturnMode, Chunk, File, Repaired, Offset, + read_repair4(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth + 1, STime, S); +read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, Size, Depth, STime, #state{proj=P}=S) -> %% io:format(user, "read_repair3 sleep1,", []), sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> - {error, partition}; + {reply, {error, partition}, S}; true -> S2 = update_proj(S#state{proj=undefined, bad_proj=P}), case S2#state.proj of P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> - read_repair3(MidsTails, ReturnMode, Chunk, Repaired, File, + read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, Size, Depth + 1, STime, S2); P2 -> - MidsTails2 = P2#projection_v1.upi -- Repaired, - read_repair4(MidsTails2, ReturnMode, Chunk, Repaired, File, + ToRepair2 = mutation_flus(P2) -- Repaired, + read_repair4(ToRepair2, ReturnMode, Chunk, Repaired, File, Offset, Size, Depth + 1, STime, S2) end end. @@ -518,7 +550,7 @@ read_repair4([], ReturnMode, Chunk, Repaired, File, Offset, read -> {reply, {ok, Chunk}, S} end; -read_repair4([First|Rest]=MidsTails, ReturnMode, Chunk, Repaired, File, Offset, +read_repair4([First|Rest]=ToRepair, ReturnMode, Chunk, Repaired, File, Offset, Size, Depth, STime, #state{epoch_id=EpochID, proxies_dict=PD}=S) -> Proxy = orddict:fetch(First, PD), case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of @@ -527,7 +559,7 @@ read_repair4([First|Rest]=MidsTails, ReturnMode, Chunk, Repaired, File, Offset, Offset, Size, Depth, STime, S); {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> - read_repair3(MidsTails, ReturnMode, Chunk, Repaired, File, + read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, Size, Depth, STime, S); {error, written} -> %% TODO: To be very paranoid, read the chunk here to verify @@ -545,22 +577,12 @@ update_proj(S) -> update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) -> Timeout = 2*1000, - Parent = self(), + WTimeout = 2*Timeout, Proxies = orddict:to_list(ProxiesDict), - MiddleWorker = spawn( - fun() -> - PidsMons = - [spawn_monitor(fun() -> - exit(catch ?FLU_PC:read_latest_projection( - Proxy, private, Timeout)) - end) || {_K, Proxy} <- Proxies], - Rs = gather_worker_statuses(PidsMons, Timeout*2), - Parent ! {res, self(), Rs}, - exit(normal) - end), - Rs = receive {res, MiddleWorker, Results} -> Results - after Timeout*2 -> [] - end, + Work = fun({_K, Proxy}) -> + ?FLU_PC:read_latest_projection(Proxy, private, Timeout) + end, + Rs = run_middleworker_job(Work, Proxies, WTimeout), %% TODO: There's a possible bug here when running multiple independent %% Machi clusters/chains. If our chain used to be [a,b], but our %% sysadmin has changed our cluster to be [a] and a completely seprate @@ -581,6 +603,26 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) -> update_proj2(Count + 1, S) end. +run_middleworker_job(Fun, ArgList, WTimeout) -> + Parent = self(), + MiddleWorker = + spawn(fun() -> + PidsMons = + [spawn_monitor(fun() -> + Res = (catch Fun(Arg)), + exit(Res) + end) || Arg <- ArgList], + Rs = gather_worker_statuses(PidsMons, WTimeout), + Parent ! {res, self(), Rs}, + exit(normal) + end), + receive + {res, MiddleWorker, Results} -> + Results + after WTimeout+100 -> + [] + end. + gather_worker_statuses([], _Timeout) -> []; gather_worker_statuses([{Pid,Ref}|Rest], Timeout) -> @@ -600,6 +642,34 @@ choose_best_proj(Rs) -> BestEpoch end, WorstEpoch, Rs). +try_to_find_chunk(Eligible, File, Offset, Size, + #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> + Timeout = 2*1000, + Work = fun(FLU) -> + Proxy = orddict:fetch(FLU, PD), + case ?FLU_PC:read_chunk(Proxy, EpochID, + File, Offset, Size) of + {ok, Chunk} when byte_size(Chunk) == Size -> + {FLU, {ok, Chunk}}; + Else -> + {FLU, Else} + end + end, + Rs = run_middleworker_job(Work, Eligible, Timeout), + case [X || {_, {ok, B}}=X <- Rs, is_binary(B)] of + [{FoundFLU, {ok, Chunk}}|_] -> + {ok, Chunk, FoundFLU}; + [] -> + RetryErrs = [partition, bad_epoch, wedged], + case [Err || {error, Err} <- Rs, lists:member(Err, RetryErrs)] of + [SomeErr|_] -> + {error, SomeErr}; + [] -> + %% TODO does this really work 100% of the time? + {error, not_written} + end + end. + mutation_flus(#projection_v1{upi=UPI, repairing=Repairing}) -> UPI ++ Repairing; mutation_flus(#state{proj=P}) -> diff --git a/test/machi_cr_client_test.erl b/test/machi_cr_client_test.erl index ad3a2e1..b2d39a6 100644 --- a/test/machi_cr_client_test.erl +++ b/test/machi_cr_client_test.erl @@ -32,37 +32,73 @@ smoke_test() -> os:cmd("rm -rf ./data.a ./data.b ./data.c"), {ok, SupPid} = machi_flu_sup:start_link(), + error_logger:tty(false), try Prefix = <<"pre">>, Chunk1 = <<"yochunk">>, Host = "localhost", PortBase = 4444, - {ok,_}=machi_flu_psup:start_flu_package(a, PortBase+0, "./data.a", []), - {ok,_}=machi_flu_psup:start_flu_package(b, PortBase+1, "./data.b", []), - {ok,_}=machi_flu_psup:start_flu_package(c, PortBase+2, "./data.c", []), + Os = [{ignore_stability_time, true}, {active_mode, false}], + {ok,_}=machi_flu_psup:start_flu_package(a, PortBase+0, "./data.a", Os), + {ok,_}=machi_flu_psup:start_flu_package(b, PortBase+1, "./data.b", Os), + {ok,_}=machi_flu_psup:start_flu_package(c, PortBase+2, "./data.c", Os), D = orddict:from_list( [{a,{p_srvr,a,machi_flu1_client,"localhost",PortBase+0,[]}}, {b,{p_srvr,b,machi_flu1_client,"localhost",PortBase+1,[]}}, {c,{p_srvr,c,machi_flu1_client,"localhost",PortBase+2,[]}}]), + %% Force the chain to repair & fully assemble as quickly as possible. + %% 1. Use set_chain_members() on all 3 + %% 2. Force a to run repair in a tight loop + %% 3. Stop as soon as we see UPI=[a,b,c] and also author=c. + %% Otherwise, we can have a race with later, minor + %% projection changes which will change our understanding of + %% the epoch id. (C is the author with highest weight.) + %% 4. Wait until all others are using epoch id from #3. + %% + %% Damn, this is a pain to make 100% deterministic, bleh. ok = machi_chain_manager1:set_chain_members(a_chmgr, D), ok = machi_chain_manager1:set_chain_members(b_chmgr, D), ok = machi_chain_manager1:set_chain_members(c_chmgr, D), - machi_projection_store:read_latest_projection(a_pstore, private), + TickAll = fun() -> [begin + Pid ! tick_check_environment, + timer:sleep(50) + end || Pid <- [a_chmgr,b_chmgr,c_chmgr] ] + end, + _ = lists:foldl( + fun(_, [{c,[a,b,c]}]=Acc) -> Acc; + (_, Acc) -> + TickAll(), % has some sleep time inside + Xs = [begin + {ok, Prj} = machi_projection_store:read_latest_projection(PStore, private), + {Prj#projection_v1.author_server, + Prj#projection_v1.upi} + end || PStore <- [a_pstore,b_pstore,c_pstore] ], + lists:usort(Xs) + end, undefined, lists:seq(1,10000)), + %% Everyone is settled on the same damn epoch id. + {ok, EpochID} = machi_flu1_client:get_latest_epochid(Host, PortBase+0, + private), + {ok, EpochID} = machi_flu1_client:get_latest_epochid(Host, PortBase+1, + private), + {ok, EpochID} = machi_flu1_client:get_latest_epochid(Host, PortBase+2, + private), + %% Whew ... ok, now start some damn tests. {ok, C1} = machi_cr_client:start_link([P || {_,P}<-orddict:to_list(D)]), machi_cr_client:append_chunk(C1, Prefix, Chunk1), %% {machi_flu_psup:stop_flu_package(c), timer:sleep(50)}, {ok, {Off1,Size1,File1}} = machi_cr_client:append_chunk(C1, Prefix, Chunk1), {ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1), - {ok, EpochID} = machi_flu1_client:get_latest_epochid(Host, PortBase+0, + {ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0, private), %% Verify that the client's CR wrote to all of them. [{ok, Chunk1} = machi_flu1_client:read_chunk( Host, PortBase+X, EpochID, File1, Off1, Size1) || X <- [0,1,2] ], - %% Manually write to head, then verify that read-repair fixes all. + %% Test read repair: Manually write to head, then verify that + %% read-repair fixes all. FooOff1 = Off1 + (1024*1024), [{error, not_written} = machi_flu1_client:read_chunk( Host, PortBase+X, EpochID, @@ -74,8 +110,20 @@ smoke_test() -> Host, PortBase+X, EpochID, File1, FooOff1, Size1)} || X <- [0,1,2] ], + %% Test read repair: Manually write to middle, then same checking. + FooOff2 = Off1 + (2*1024*1024), + Chunk2 = <<"Middle repair chunk">>, + Size2 = size(Chunk2), + ok = machi_flu1_client:write_chunk(Host, PortBase+1, EpochID, + File1, FooOff2, Chunk2), + {ok, Chunk2} = machi_cr_client:read_chunk(C1, File1, FooOff2, Size2), + [{X,{ok, Chunk2}} = {X,machi_flu1_client:read_chunk( + Host, PortBase+X, EpochID, + File1, FooOff2, Size2)} || X <- [0,1,2] ], + ok after + error_logger:tty(true), catch application:stop(machi), exit(SupPid, normal) end.