Fix known chain repair bugs, add basic smoke test

This commit is contained in:
Scott Lystig Fritchie 2015-05-19 19:32:48 +09:00
parent 152e487060
commit a4266e8aa4
4 changed files with 183 additions and 64 deletions

View file

@ -1930,9 +1930,12 @@ perhaps_start_repair(
%% RepairOpts = [{repair_mode, check}, verbose], %% RepairOpts = [{repair_mode, check}, verbose],
RepairFun = fun() -> do_repair(S, RepairOpts, ap_mode) end, RepairFun = fun() -> do_repair(S, RepairOpts, ap_mode) end,
LastUPI = lists:last(UPI), LastUPI = lists:last(UPI),
IgnoreStabilityTime_p = proplists:get_value(ignore_stability_time,
S#ch_mgr.opts, false),
case timer:now_diff(os:timestamp(), Start) div 1000000 of case timer:now_diff(os:timestamp(), Start) div 1000000 of
N when MyName == LastUPI, N when MyName == LastUPI andalso
N >= ?REPAIR_START_STABILITY_TIME -> (IgnoreStabilityTime_p orelse
N >= ?REPAIR_START_STABILITY_TIME) ->
{WorkerPid, _Ref} = spawn_monitor(RepairFun), {WorkerPid, _Ref} = spawn_monitor(RepairFun),
S#ch_mgr{repair_worker=WorkerPid, S#ch_mgr{repair_worker=WorkerPid,
repair_start=os:timestamp(), repair_start=os:timestamp(),

View file

@ -107,8 +107,6 @@ repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) ->
catch catch
What:Why -> What:Why ->
Stack = erlang:get_stacktrace(), Stack = erlang:get_stacktrace(),
io:format(user, "What Why ~p ~p @\n\t~p\n",
[What, Why, Stack]),
{error, {What, Why, Stack}} {error, {What, Why, Stack}}
after after
[(catch machi_proxy_flu1_client:quit(Pid)) || [(catch machi_proxy_flu1_client:quit(Pid)) ||

View file

@ -261,7 +261,7 @@ handle_call({req, Req}, From, S) ->
handle_call(quit, _From, S) -> handle_call(quit, _From, S) ->
{stop, normal, ok, S}; {stop, normal, ok, S};
handle_call(_Request, _From, S) -> handle_call(_Request, _From, S) ->
Reply = ok, Reply = whaaaaaaaaaaaaaaaaaaaa,
{reply, Reply, S}. {reply, Reply, S}.
handle_cast(_Msg, S) -> handle_cast(_Msg, S) ->
@ -292,7 +292,7 @@ do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, #state{proj=P}=S) ->
sleep_a_while(Depth), sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > ?MAX_RUNTIME -> if DiffMs > ?MAX_RUNTIME ->
{error, partition}; {reply, {error, partition}, S};
true -> true ->
%% This is suboptimal for performance: there are some paths %% This is suboptimal for performance: there are some paths
%% through this point where our current projection is good %% through this point where our current projection is good
@ -345,12 +345,12 @@ do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
sleep_a_while(Depth), sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > ?MAX_RUNTIME -> if DiffMs > ?MAX_RUNTIME ->
{error, partition}; {reply, {error, partition}, S};
true -> true ->
S2 = update_proj(S#state{proj=undefined, bad_proj=P}), S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
case S2#state.proj of case S2#state.proj of
undefined -> undefined ->
{error, partition}; {reply, {error, partition}, S};
P2 -> P2 ->
RestFLUs2 = mutation_flus(P2), RestFLUs2 = mutation_flus(P2),
case RestFLUs2 -- Ws of case RestFLUs2 -- Ws of
@ -400,13 +400,13 @@ do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk,
do_read_chunk(File, Offset, Size, 0=Depth, STime, do_read_chunk(File, Offset, Size, 0=Depth, STime,
#state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty
do_read_chunk2(File, Offset, Size, Depth, STime, S); do_read_chunk2(File, Offset, Size, Depth + 1, STime, S);
do_read_chunk(File, Offset, Size, Depth, STime, #state{proj=P}=S) -> do_read_chunk(File, Offset, Size, Depth, STime, #state{proj=P}=S) ->
%% io:format(user, "read sleep1,", []), %% io:format(user, "read sleep1,", []),
sleep_a_while(Depth), sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > ?MAX_RUNTIME -> if DiffMs > ?MAX_RUNTIME ->
{error, partition}; {reply, {error, partition}, S};
true -> true ->
S2 = update_proj(S#state{proj=undefined, bad_proj=P}), S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
case S2#state.proj of case S2#state.proj of
@ -421,8 +421,8 @@ do_read_chunk(File, Offset, Size, Depth, STime, #state{proj=P}=S) ->
do_read_chunk2(File, Offset, Size, Depth, STime, do_read_chunk2(File, Offset, Size, Depth, STime,
#state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) ->
UPI = readonly_flus(P), UPI = readonly_flus(P),
Head = hd(UPI),
Tail = lists:last(UPI), Tail = lists:last(UPI),
ConsistencyMode = P#projection_v1.mode,
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
File, Offset, Size, ?TIMEOUT) of File, Offset, Size, ?TIMEOUT) of
{ok, Chunk} when byte_size(Chunk) == Size -> {ok, Chunk} when byte_size(Chunk) == Size ->
@ -433,52 +433,84 @@ do_read_chunk2(File, Offset, Size, Depth, STime,
{error, Retry} {error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged -> when Retry == partition; Retry == bad_epoch; Retry == wedged ->
do_read_chunk(File, Offset, Size, Depth, STime, S); do_read_chunk(File, Offset, Size, Depth, STime, S);
{error, not_written} when Tail == Head -> {error, not_written} ->
{{error, not_written}, S}; read_repair(ConsistencyMode, read, File, Offset, Size, Depth, STime, S);
{error, not_written} when Tail /= Head -> %% {reply, {error, not_written}, S};
read_repair(read, File, Offset, Size, Depth, STime, S);
{error, written} -> {error, written} ->
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size})
end. end.
read_repair(ReturnMode, File, Offset, Size, 0=Depth, STime, %% Read repair: depends on the consistency mode that we're in:
#state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty %%
read_repair2(ReturnMode, File, Offset, Size, Depth, STime, S); %% CP mode: If the head is written, then use it to repair UPI++Repairing.
read_repair(ReturnMode, File, Offset, Size, Depth, STime, #state{proj=P}=S) -> %% If head is not_written, then do nothing.
%% io:format(user, "read_repair sleep1,", []), %% AP mode: If any FLU in UPI++Repairing is written, then use it to repair
%% UPI+repairing.
%% If all FLUs in UPI++Repairing are not_written, then do nothing.
read_repair(ConsistencyMode, ReturnMode, File, Offset, Size, 0=Depth,
STime, #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty
read_repair2(ConsistencyMode, ReturnMode, File, Offset, Size, Depth + 1,
STime, S);
read_repair(ConsistencyMode, ReturnMode, File, Offset, Size, Depth,
STime, #state{proj=P}=S) ->
sleep_a_while(Depth), sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > ?MAX_RUNTIME -> if DiffMs > ?MAX_RUNTIME ->
{error, partition}; {reply, {error, partition}, S};
true -> true ->
S2 = update_proj(S#state{proj=undefined, bad_proj=P}), S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
case S2#state.proj of case S2#state.proj of
P2 when P2 == undefined orelse P2 when P2 == undefined orelse
P2#projection_v1.upi == [] -> P2#projection_v1.upi == [] ->
read_repair(ReturnMode, File, Offset, Size, read_repair(ConsistencyMode, ReturnMode, File, Offset,
Depth + 1, STime, S2); Size, Depth + 1, STime, S2);
_ -> _ ->
read_repair2(ReturnMode, File, Offset, Size, read_repair2(ConsistencyMode, ReturnMode, File, Offset,
Depth + 1, STime, S2) Size, Depth + 1, STime, S2)
end end
end. end.
read_repair2(ReturnMode, File, Offset, Size, Depth, STime, read_repair2(cp_mode=ConsistencyMode,
ReturnMode, File, Offset, Size, Depth, STime,
#state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) ->
[Head|MidsTails] = readonly_flus(P), Tail = lists:last(readonly_flus(P)),
case ?FLU_PC:read_chunk(orddict:fetch(Head, PD), EpochID, case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
File, Offset, Size, ?TIMEOUT) of File, Offset, Size, ?TIMEOUT) of
{ok, Chunk} when byte_size(Chunk) == Size -> {ok, Chunk} when byte_size(Chunk) == Size ->
read_repair3(MidsTails, ReturnMode, Chunk, [Head], File, Offset, ToRepair = mutation_flus(P) -- [Tail],
read_repair3(ToRepair, ReturnMode, Chunk, [Tail], File, Offset,
Size, Depth, STime, S); Size, Depth, STime, S);
{ok, BadChunk} -> {ok, BadChunk} ->
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset,
got, byte_size(BadChunk)}); Size, got, byte_size(BadChunk)});
{error, Retry} {error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged -> when Retry == partition; Retry == bad_epoch; Retry == wedged ->
read_repair(ReturnMode, File, Offset, Size, Depth, STime, S); read_repair(ConsistencyMode, ReturnMode, File, Offset,
Size, Depth, STime, S);
{error, not_written} -> {error, not_written} ->
{{error, not_written}, S}; {reply, {error, not_written}, S};
{error, written} ->
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size})
end;
read_repair2(ap_mode=ConsistencyMode,
ReturnMode, File, Offset, Size, Depth, STime,
#state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) ->
Eligible = mutation_flus(P),
case try_to_find_chunk(Eligible, File, Offset, Size, S) of
{ok, Chunk, GotItFrom} when byte_size(Chunk) == Size ->
ToRepair = mutation_flus(P) -- [GotItFrom],
read_repair3(ToRepair, ReturnMode, Chunk, [GotItFrom], File,
Offset, Size, Depth, STime, S);
{ok, BadChunk} ->
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File,
Offset, Size, got, byte_size(BadChunk)});
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
read_repair(ConsistencyMode, ReturnMode, File,
Offset, Size, Depth, STime, S);
{error, not_written} ->
{reply, {error, not_written}, S};
{error, written} -> {error, written} ->
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size})
end. end.
@ -487,27 +519,27 @@ read_repair3([], ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, S) -> Size, Depth, STime, S) ->
read_repair4([], ReturnMode, Chunk, Repaired, File, Offset, read_repair4([], ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, S); Size, Depth, STime, S);
read_repair3(MidsTails, ReturnMode, Chunk, Repaired, File, Offset, read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
Size, 0=Depth, STime, S) -> Size, 0=Depth, STime, S) ->
read_repair4(MidsTails, ReturnMode, Chunk, Repaired, File, Offset, read_repair4(ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, S); Size, Depth + 1, STime, S);
read_repair3(MidsTails, ReturnMode, Chunk, File, Repaired, Offset, read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, #state{proj=P}=S) -> Size, Depth, STime, #state{proj=P}=S) ->
%% io:format(user, "read_repair3 sleep1,", []), %% io:format(user, "read_repair3 sleep1,", []),
sleep_a_while(Depth), sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > ?MAX_RUNTIME -> if DiffMs > ?MAX_RUNTIME ->
{error, partition}; {reply, {error, partition}, S};
true -> true ->
S2 = update_proj(S#state{proj=undefined, bad_proj=P}), S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
case S2#state.proj of case S2#state.proj of
P2 when P2 == undefined orelse P2 when P2 == undefined orelse
P2#projection_v1.upi == [] -> P2#projection_v1.upi == [] ->
read_repair3(MidsTails, ReturnMode, Chunk, Repaired, File, read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File,
Offset, Size, Depth + 1, STime, S2); Offset, Size, Depth + 1, STime, S2);
P2 -> P2 ->
MidsTails2 = P2#projection_v1.upi -- Repaired, ToRepair2 = mutation_flus(P2) -- Repaired,
read_repair4(MidsTails2, ReturnMode, Chunk, Repaired, File, read_repair4(ToRepair2, ReturnMode, Chunk, Repaired, File,
Offset, Size, Depth + 1, STime, S2) Offset, Size, Depth + 1, STime, S2)
end end
end. end.
@ -518,7 +550,7 @@ read_repair4([], ReturnMode, Chunk, Repaired, File, Offset,
read -> read ->
{reply, {ok, Chunk}, S} {reply, {ok, Chunk}, S}
end; end;
read_repair4([First|Rest]=MidsTails, ReturnMode, Chunk, Repaired, File, Offset, read_repair4([First|Rest]=ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, #state{epoch_id=EpochID, proxies_dict=PD}=S) -> Size, Depth, STime, #state{epoch_id=EpochID, proxies_dict=PD}=S) ->
Proxy = orddict:fetch(First, PD), Proxy = orddict:fetch(First, PD),
case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of
@ -527,7 +559,7 @@ read_repair4([First|Rest]=MidsTails, ReturnMode, Chunk, Repaired, File, Offset,
Offset, Size, Depth, STime, S); Offset, Size, Depth, STime, S);
{error, Retry} {error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged -> when Retry == partition; Retry == bad_epoch; Retry == wedged ->
read_repair3(MidsTails, ReturnMode, Chunk, Repaired, File, read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File,
Offset, Size, Depth, STime, S); Offset, Size, Depth, STime, S);
{error, written} -> {error, written} ->
%% TODO: To be very paranoid, read the chunk here to verify %% TODO: To be very paranoid, read the chunk here to verify
@ -545,22 +577,12 @@ update_proj(S) ->
update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) -> update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) ->
Timeout = 2*1000, Timeout = 2*1000,
Parent = self(), WTimeout = 2*Timeout,
Proxies = orddict:to_list(ProxiesDict), Proxies = orddict:to_list(ProxiesDict),
MiddleWorker = spawn( Work = fun({_K, Proxy}) ->
fun() -> ?FLU_PC:read_latest_projection(Proxy, private, Timeout)
PidsMons = end,
[spawn_monitor(fun() -> Rs = run_middleworker_job(Work, Proxies, WTimeout),
exit(catch ?FLU_PC:read_latest_projection(
Proxy, private, Timeout))
end) || {_K, Proxy} <- Proxies],
Rs = gather_worker_statuses(PidsMons, Timeout*2),
Parent ! {res, self(), Rs},
exit(normal)
end),
Rs = receive {res, MiddleWorker, Results} -> Results
after Timeout*2 -> []
end,
%% TODO: There's a possible bug here when running multiple independent %% TODO: There's a possible bug here when running multiple independent
%% Machi clusters/chains. If our chain used to be [a,b], but our %% Machi clusters/chains. If our chain used to be [a,b], but our
%% sysadmin has changed our cluster to be [a] and a completely seprate %% sysadmin has changed our cluster to be [a] and a completely seprate
@ -581,6 +603,26 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) ->
update_proj2(Count + 1, S) update_proj2(Count + 1, S)
end. end.
run_middleworker_job(Fun, ArgList, WTimeout) ->
Parent = self(),
MiddleWorker =
spawn(fun() ->
PidsMons =
[spawn_monitor(fun() ->
Res = (catch Fun(Arg)),
exit(Res)
end) || Arg <- ArgList],
Rs = gather_worker_statuses(PidsMons, WTimeout),
Parent ! {res, self(), Rs},
exit(normal)
end),
receive
{res, MiddleWorker, Results} ->
Results
after WTimeout+100 ->
[]
end.
gather_worker_statuses([], _Timeout) -> gather_worker_statuses([], _Timeout) ->
[]; [];
gather_worker_statuses([{Pid,Ref}|Rest], Timeout) -> gather_worker_statuses([{Pid,Ref}|Rest], Timeout) ->
@ -600,6 +642,34 @@ choose_best_proj(Rs) ->
BestEpoch BestEpoch
end, WorstEpoch, Rs). end, WorstEpoch, Rs).
try_to_find_chunk(Eligible, File, Offset, Size,
#state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) ->
Timeout = 2*1000,
Work = fun(FLU) ->
Proxy = orddict:fetch(FLU, PD),
case ?FLU_PC:read_chunk(Proxy, EpochID,
File, Offset, Size) of
{ok, Chunk} when byte_size(Chunk) == Size ->
{FLU, {ok, Chunk}};
Else ->
{FLU, Else}
end
end,
Rs = run_middleworker_job(Work, Eligible, Timeout),
case [X || {_, {ok, B}}=X <- Rs, is_binary(B)] of
[{FoundFLU, {ok, Chunk}}|_] ->
{ok, Chunk, FoundFLU};
[] ->
RetryErrs = [partition, bad_epoch, wedged],
case [Err || {error, Err} <- Rs, lists:member(Err, RetryErrs)] of
[SomeErr|_] ->
{error, SomeErr};
[] ->
%% TODO does this really work 100% of the time?
{error, not_written}
end
end.
mutation_flus(#projection_v1{upi=UPI, repairing=Repairing}) -> mutation_flus(#projection_v1{upi=UPI, repairing=Repairing}) ->
UPI ++ Repairing; UPI ++ Repairing;
mutation_flus(#state{proj=P}) -> mutation_flus(#state{proj=P}) ->

View file

@ -32,37 +32,73 @@
smoke_test() -> smoke_test() ->
os:cmd("rm -rf ./data.a ./data.b ./data.c"), os:cmd("rm -rf ./data.a ./data.b ./data.c"),
{ok, SupPid} = machi_flu_sup:start_link(), {ok, SupPid} = machi_flu_sup:start_link(),
error_logger:tty(false),
try try
Prefix = <<"pre">>, Prefix = <<"pre">>,
Chunk1 = <<"yochunk">>, Chunk1 = <<"yochunk">>,
Host = "localhost", Host = "localhost",
PortBase = 4444, PortBase = 4444,
{ok,_}=machi_flu_psup:start_flu_package(a, PortBase+0, "./data.a", []), Os = [{ignore_stability_time, true}, {active_mode, false}],
{ok,_}=machi_flu_psup:start_flu_package(b, PortBase+1, "./data.b", []), {ok,_}=machi_flu_psup:start_flu_package(a, PortBase+0, "./data.a", Os),
{ok,_}=machi_flu_psup:start_flu_package(c, PortBase+2, "./data.c", []), {ok,_}=machi_flu_psup:start_flu_package(b, PortBase+1, "./data.b", Os),
{ok,_}=machi_flu_psup:start_flu_package(c, PortBase+2, "./data.c", Os),
D = orddict:from_list( D = orddict:from_list(
[{a,{p_srvr,a,machi_flu1_client,"localhost",PortBase+0,[]}}, [{a,{p_srvr,a,machi_flu1_client,"localhost",PortBase+0,[]}},
{b,{p_srvr,b,machi_flu1_client,"localhost",PortBase+1,[]}}, {b,{p_srvr,b,machi_flu1_client,"localhost",PortBase+1,[]}},
{c,{p_srvr,c,machi_flu1_client,"localhost",PortBase+2,[]}}]), {c,{p_srvr,c,machi_flu1_client,"localhost",PortBase+2,[]}}]),
%% Force the chain to repair & fully assemble as quickly as possible.
%% 1. Use set_chain_members() on all 3
%% 2. Force a to run repair in a tight loop
%% 3. Stop as soon as we see UPI=[a,b,c] and also author=c.
%% Otherwise, we can have a race with later, minor
%% projection changes which will change our understanding of
%% the epoch id. (C is the author with highest weight.)
%% 4. Wait until all others are using epoch id from #3.
%%
%% Damn, this is a pain to make 100% deterministic, bleh.
ok = machi_chain_manager1:set_chain_members(a_chmgr, D), ok = machi_chain_manager1:set_chain_members(a_chmgr, D),
ok = machi_chain_manager1:set_chain_members(b_chmgr, D), ok = machi_chain_manager1:set_chain_members(b_chmgr, D),
ok = machi_chain_manager1:set_chain_members(c_chmgr, D), ok = machi_chain_manager1:set_chain_members(c_chmgr, D),
machi_projection_store:read_latest_projection(a_pstore, private), TickAll = fun() -> [begin
Pid ! tick_check_environment,
timer:sleep(50)
end || Pid <- [a_chmgr,b_chmgr,c_chmgr] ]
end,
_ = lists:foldl(
fun(_, [{c,[a,b,c]}]=Acc) -> Acc;
(_, Acc) ->
TickAll(), % has some sleep time inside
Xs = [begin
{ok, Prj} = machi_projection_store:read_latest_projection(PStore, private),
{Prj#projection_v1.author_server,
Prj#projection_v1.upi}
end || PStore <- [a_pstore,b_pstore,c_pstore] ],
lists:usort(Xs)
end, undefined, lists:seq(1,10000)),
%% Everyone is settled on the same damn epoch id.
{ok, EpochID} = machi_flu1_client:get_latest_epochid(Host, PortBase+0,
private),
{ok, EpochID} = machi_flu1_client:get_latest_epochid(Host, PortBase+1,
private),
{ok, EpochID} = machi_flu1_client:get_latest_epochid(Host, PortBase+2,
private),
%% Whew ... ok, now start some damn tests.
{ok, C1} = machi_cr_client:start_link([P || {_,P}<-orddict:to_list(D)]), {ok, C1} = machi_cr_client:start_link([P || {_,P}<-orddict:to_list(D)]),
machi_cr_client:append_chunk(C1, Prefix, Chunk1), machi_cr_client:append_chunk(C1, Prefix, Chunk1),
%% {machi_flu_psup:stop_flu_package(c), timer:sleep(50)}, %% {machi_flu_psup:stop_flu_package(c), timer:sleep(50)},
{ok, {Off1,Size1,File1}} = {ok, {Off1,Size1,File1}} =
machi_cr_client:append_chunk(C1, Prefix, Chunk1), machi_cr_client:append_chunk(C1, Prefix, Chunk1),
{ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1), {ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
{ok, EpochID} = machi_flu1_client:get_latest_epochid(Host, PortBase+0, {ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0,
private), private),
%% Verify that the client's CR wrote to all of them. %% Verify that the client's CR wrote to all of them.
[{ok, Chunk1} = machi_flu1_client:read_chunk( [{ok, Chunk1} = machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID, File1, Off1, Size1) || Host, PortBase+X, EpochID, File1, Off1, Size1) ||
X <- [0,1,2] ], X <- [0,1,2] ],
%% Manually write to head, then verify that read-repair fixes all. %% Test read repair: Manually write to head, then verify that
%% read-repair fixes all.
FooOff1 = Off1 + (1024*1024), FooOff1 = Off1 + (1024*1024),
[{error, not_written} = machi_flu1_client:read_chunk( [{error, not_written} = machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID, Host, PortBase+X, EpochID,
@ -74,8 +110,20 @@ smoke_test() ->
Host, PortBase+X, EpochID, Host, PortBase+X, EpochID,
File1, FooOff1, Size1)} || X <- [0,1,2] ], File1, FooOff1, Size1)} || X <- [0,1,2] ],
%% Test read repair: Manually write to middle, then same checking.
FooOff2 = Off1 + (2*1024*1024),
Chunk2 = <<"Middle repair chunk">>,
Size2 = size(Chunk2),
ok = machi_flu1_client:write_chunk(Host, PortBase+1, EpochID,
File1, FooOff2, Chunk2),
{ok, Chunk2} = machi_cr_client:read_chunk(C1, File1, FooOff2, Size2),
[{X,{ok, Chunk2}} = {X,machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID,
File1, FooOff2, Size2)} || X <- [0,1,2] ],
ok ok
after after
error_logger:tty(true),
catch application:stop(machi), catch application:stop(machi),
exit(SupPid, normal) exit(SupPid, normal)
end. end.