Bugfix for read-repair (thanks PULSE), model change to handle handle aborted writes
This commit is contained in:
parent
b7b9255f5f
commit
8b105672b1
3 changed files with 120 additions and 18 deletions
|
@ -129,7 +129,15 @@ ok_or_trim(error_trimmed) ->
|
||||||
ok_or_trim(Else) ->
|
ok_or_trim(Else) ->
|
||||||
Else.
|
Else.
|
||||||
|
|
||||||
read_repair_chain(Epoch, LPN, [Head|Rest] = Chain) ->
|
read_repair_chain(Epoch, LPN, Chain) ->
|
||||||
|
try
|
||||||
|
read_repair_chain1(Epoch, LPN, Chain)
|
||||||
|
catch
|
||||||
|
throw:{i_give_up,Res} ->
|
||||||
|
Res
|
||||||
|
end.
|
||||||
|
|
||||||
|
read_repair_chain1(Epoch, LPN, [Head|Rest] = Chain) ->
|
||||||
?EVENT_LOG({read_repair, LPN, Chain, i_am, self()}),
|
?EVENT_LOG({read_repair, LPN, Chain, i_am, self()}),
|
||||||
case corfurl_flu:read(flu_pid(Head), Epoch, LPN) of
|
case corfurl_flu:read(flu_pid(Head), Epoch, LPN) of
|
||||||
{ok, Page} ->
|
{ok, Page} ->
|
||||||
|
@ -153,10 +161,17 @@ read_repair_chain(Epoch, LPN, [Head|Rest] = Chain) ->
|
||||||
Res2 = ok_or_trim(corfurl_flu:trim(
|
Res2 = ok_or_trim(corfurl_flu:trim(
|
||||||
flu_pid(X), Epoch, LPN)),
|
flu_pid(X), Epoch, LPN)),
|
||||||
?EVENT_LOG({read_repair, LPN, fill, flu_pid(X), trim, Res2}),
|
?EVENT_LOG({read_repair, LPN, fill, flu_pid(X), trim, Res2}),
|
||||||
Res2;
|
case Res2 of ok -> ok;
|
||||||
|
_ -> throw({i_give_up,Res2})
|
||||||
|
end;
|
||||||
Else ->
|
Else ->
|
||||||
|
%% We're too deeply nested for the current code
|
||||||
|
%% to deal with, and we're racing. Fine, let
|
||||||
|
%% our opponent continue. We'll give up, and if
|
||||||
|
%% the client wants to try again, we can try
|
||||||
|
%% again from the top.
|
||||||
?EVENT_LOG({read_repair, LPN, fill, flu_pid(X), Else}),
|
?EVENT_LOG({read_repair, LPN, fill, flu_pid(X), Else}),
|
||||||
Else
|
throw({i_give_up,Else})
|
||||||
end
|
end
|
||||||
end || X <- Rest],
|
end || X <- Rest],
|
||||||
error_trimmed;
|
error_trimmed;
|
||||||
|
|
|
@ -22,6 +22,8 @@
|
||||||
|
|
||||||
-export([append_page/2, read_page/2, fill_page/2, trim_page/2, scan_forward/3]).
|
-export([append_page/2, read_page/2, fill_page/2, trim_page/2, scan_forward/3]).
|
||||||
-export([restart_sequencer/1]).
|
-export([restart_sequencer/1]).
|
||||||
|
%% For debugging/verification only
|
||||||
|
-export([pulse_tracing_start/1, pulse_tracing_add/2, pulse_tracing_get/1]).
|
||||||
|
|
||||||
-include("corfurl.hrl").
|
-include("corfurl.hrl").
|
||||||
|
|
||||||
|
@ -36,6 +38,7 @@ append_page(Proj, _Page, 0) ->
|
||||||
append_page(#proj{seq={Sequencer,_,_}} = Proj, Page, Retries) ->
|
append_page(#proj{seq={Sequencer,_,_}} = Proj, Page, Retries) ->
|
||||||
try
|
try
|
||||||
{ok, LPN} = corfurl_sequencer:get(Sequencer, 1),
|
{ok, LPN} = corfurl_sequencer:get(Sequencer, 1),
|
||||||
|
pulse_tracing_add(write, LPN),
|
||||||
append_page1(Proj, LPN, Page, 5)
|
append_page1(Proj, LPN, Page, 5)
|
||||||
catch
|
catch
|
||||||
exit:{Reason,{_gen_server_or_pulse_gen_server,call,[Sequencer|_]}}
|
exit:{Reason,{_gen_server_or_pulse_gen_server,call,[Sequencer|_]}}
|
||||||
|
@ -217,7 +220,7 @@ report_lost_race(LPN, Reason) ->
|
||||||
|
|
||||||
-ifdef(PULSE).
|
-ifdef(PULSE).
|
||||||
get_poll_retries() ->
|
get_poll_retries() ->
|
||||||
9999*1000.
|
999*1000.
|
||||||
|
|
||||||
get_poll_sleep_time() ->
|
get_poll_sleep_time() ->
|
||||||
1.
|
1.
|
||||||
|
@ -230,3 +233,31 @@ get_poll_sleep_time() ->
|
||||||
50.
|
50.
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
|
-ifdef(PULSE).
|
||||||
|
|
||||||
|
pulse_tracing_start(Type) ->
|
||||||
|
put({?MODULE, Type}, []).
|
||||||
|
|
||||||
|
pulse_tracing_add(Type, Stuff) ->
|
||||||
|
List = case pulse_tracing_get(Type) of
|
||||||
|
undefined -> [];
|
||||||
|
L -> L
|
||||||
|
end,
|
||||||
|
put({?MODULE, Type}, [Stuff|List]).
|
||||||
|
|
||||||
|
pulse_tracing_get(Type) ->
|
||||||
|
get({?MODULE, Type}).
|
||||||
|
|
||||||
|
-else.
|
||||||
|
|
||||||
|
pulse_tracing_start(_Type) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
pulse_tracing_add(_Type, _Stuff) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
pulse_tracing_get(_Type) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
-endif.
|
||||||
|
|
|
@ -337,7 +337,8 @@ check_trace(Trace0, _Cmds, _Seed) ->
|
||||||
({call, _Pid, {append, _Pg, will_fail, {special_trimmed, LPN}}}) -> LPN;
|
({call, _Pid, {append, _Pg, will_fail, {special_trimmed, LPN}}}) -> LPN;
|
||||||
({call, _Pid, {read, LPN, _, _}}) -> LPN;
|
({call, _Pid, {read, LPN, _, _}}) -> LPN;
|
||||||
({call, _Pid, {fill, LPN, will_be, ok}}) -> LPN;
|
({call, _Pid, {fill, LPN, will_be, ok}}) -> LPN;
|
||||||
({call, _Pid, {trim, LPN, will_be, ok}}) -> LPN
|
({call, _Pid, {trim, LPN, will_be, ok}}) -> LPN;
|
||||||
|
({call, _Pid, {goo_write, LPN, _Pg}}) -> LPN
|
||||||
end,
|
end,
|
||||||
fun(x) -> [] end,
|
fun(x) -> [] end,
|
||||||
Calls),
|
Calls),
|
||||||
|
@ -405,8 +406,7 @@ check_trace(Trace0, _Cmds, _Seed) ->
|
||||||
|
|
||||||
InitialValDict = orddict:from_list([{LPN, [error_unwritten]} ||
|
InitialValDict = orddict:from_list([{LPN, [error_unwritten]} ||
|
||||||
LPN <- AllLPNs]),
|
LPN <- AllLPNs]),
|
||||||
{ValuesR, _} =
|
ValuesRFun =
|
||||||
lists:mapfoldl(
|
|
||||||
fun({TS1, TS2, StEnds}, Dict1) ->
|
fun({TS1, TS2, StEnds}, Dict1) ->
|
||||||
Dict2 = lists:foldl(
|
Dict2 = lists:foldl(
|
||||||
fun({mod_start, w_1, LPN, Pg}, D) ->
|
fun({mod_start, w_1, LPN, Pg}, D) ->
|
||||||
|
@ -440,7 +440,8 @@ check_trace(Trace0, _Cmds, _Seed) ->
|
||||||
orddict:store(LPN, [Pg,error_trimmed], D)
|
orddict:store(LPN, [Pg,error_trimmed], D)
|
||||||
end, Dict2, [X || X={mod_end,_,_,_} <- StEnds]),
|
end, Dict2, [X || X={mod_end,_,_,_} <- StEnds]),
|
||||||
{{TS1, TS2, [{values, Dict3}]}, Dict3}
|
{{TS1, TS2, [{values, Dict3}]}, Dict3}
|
||||||
end, InitialValDict, StartsDones),
|
end,
|
||||||
|
{ValuesR, _} = lists:mapfoldl(ValuesRFun, InitialValDict, StartsDones),
|
||||||
|
|
||||||
InitialTtnDict = orddict:from_list([{LPN, [w_0]} || LPN <- AllLPNs]),
|
InitialTtnDict = orddict:from_list([{LPN, [w_0]} || LPN <- AllLPNs]),
|
||||||
{TransitionsR, _} =
|
{TransitionsR, _} =
|
||||||
|
@ -469,6 +470,14 @@ check_trace(Trace0, _Cmds, _Seed) ->
|
||||||
%% Instead, we need to merge together all possible values from ValuesR
|
%% Instead, we need to merge together all possible values from ValuesR
|
||||||
%% that appear at any time during the read op's lifetime.
|
%% that appear at any time during the read op's lifetime.
|
||||||
|
|
||||||
|
PerhapsR = eqc_temporal:stateful(
|
||||||
|
fun({call, _Pid, {goo_write, LPN, Pg}}) ->
|
||||||
|
{perhaps, LPN, Pg}
|
||||||
|
end,
|
||||||
|
fun(x)-> [] end,
|
||||||
|
Events),
|
||||||
|
{_, _, Perhaps} = lists:last(eqc_temporal:all_future(PerhapsR)),
|
||||||
|
%%?QC_FMT("*Perhaps: ~p\n", [Perhaps]),
|
||||||
Reads = eqc_temporal:stateful(
|
Reads = eqc_temporal:stateful(
|
||||||
fun({call, Pid, {read, LPN, _, _}}) ->
|
fun({call, Pid, {read, LPN, _, _}}) ->
|
||||||
{read, Pid, LPN, []}
|
{read, Pid, LPN, []}
|
||||||
|
@ -483,10 +492,27 @@ check_trace(Trace0, _Cmds, _Seed) ->
|
||||||
false = NewVs == V1s,
|
false = NewVs == V1s,
|
||||||
{read, Pid, LPN, NewVs};
|
{read, Pid, LPN, NewVs};
|
||||||
({read, Pid, LPN, Vs}, {result, Pid, Pg}) ->
|
({read, Pid, LPN, Vs}, {result, Pid, Pg}) ->
|
||||||
|
%% case lists:member(Pg, Vs) orelse
|
||||||
|
%% lists:member({perhaps, LPN, Pg}, Perhaps) of
|
||||||
case lists:member(Pg, Vs) of
|
case lists:member(Pg, Vs) of
|
||||||
true -> [];
|
true ->
|
||||||
false -> [{bad, read, LPN, Pid, got, Pg,
|
[];
|
||||||
possible, Vs}]
|
false ->
|
||||||
|
case lists:member({perhaps, LPN, Pg}, Perhaps) of
|
||||||
|
true ->
|
||||||
|
%% The checking of the Perhaps list in
|
||||||
|
%% this manner is not strictly
|
||||||
|
%% temporally valid. It is possible
|
||||||
|
%% for the {perhaps,...} event to be
|
||||||
|
%% after the event we're checking here.
|
||||||
|
%% TODO work is to make this check 100%
|
||||||
|
%% temporally valid.
|
||||||
|
io:format(user, "Yo, found ~p ~p in Perhaps\n", [LPN, Pg]),
|
||||||
|
[];
|
||||||
|
false ->
|
||||||
|
[{bad, read, LPN, Pid, got, Pg,
|
||||||
|
possible, Vs}]
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end, eqc_temporal:union(Events, ValuesR)),
|
end, eqc_temporal:union(Events, ValuesR)),
|
||||||
BadFilter = fun(bad) -> true;
|
BadFilter = fun(bad) -> true;
|
||||||
|
@ -518,7 +544,8 @@ check_trace(Trace0, _Cmds, _Seed) ->
|
||||||
?QC_FMT("*InvalidTtns: ~p\n", [InvalidTransitions]),
|
?QC_FMT("*InvalidTtns: ~p\n", [InvalidTransitions]),
|
||||||
?QC_FMT("*ValuesR: ~p\n", [eqc_temporal:unions([ValuesR, StartsDones])]),
|
?QC_FMT("*ValuesR: ~p\n", [eqc_temporal:unions([ValuesR, StartsDones])]),
|
||||||
?QC_FMT("*Calls: ~p\n", [Calls]),
|
?QC_FMT("*Calls: ~p\n", [Calls]),
|
||||||
?QC_FMT("*BadReads: ~p\n", [BadReads])
|
?QC_FMT("*BadReads: ~p\n", [BadReads]),
|
||||||
|
?QC_FMT("*Perhaps: ~p\n", [Perhaps])
|
||||||
end,
|
end,
|
||||||
conjunction(
|
conjunction(
|
||||||
[
|
[
|
||||||
|
@ -724,23 +751,52 @@ pick_an_LPN(#proj{seq={Seq,_,_}} = P, SeedInt) ->
|
||||||
pick_an_LPN(corfurl_client:restart_sequencer(P), SeedInt)
|
pick_an_LPN(corfurl_client:restart_sequencer(P), SeedInt)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-define(LOG(Tag, MkCall),
|
-define(LOG3(Tag, MkCall, PostCall),
|
||||||
event_logger:event(log_make_call(Tag), lamport_clock:get()),
|
begin
|
||||||
LOG__Result = MkCall,
|
LOG__Start = lamport_clock:get(),
|
||||||
event_logger:event(log_make_result(LOG__Result), lamport_clock:get()),
|
event_logger:event(log_make_call(Tag), LOG__Start),
|
||||||
LOG__Result).
|
LOG__Result = MkCall,
|
||||||
|
LOG__End = lamport_clock:get(),
|
||||||
|
PostCall,
|
||||||
|
event_logger:event(log_make_result(LOG__Result), LOG__End),
|
||||||
|
LOG__Result
|
||||||
|
end).
|
||||||
|
|
||||||
|
-define(LOG(Tag, MkCall), ?LOG3(Tag, MkCall, okqq)).
|
||||||
|
|
||||||
append(#run{proj=OriginalProj}, Page) ->
|
append(#run{proj=OriginalProj}, Page) ->
|
||||||
lamport_clock:init(),
|
lamport_clock:init(),
|
||||||
lamport_clock:incr(),
|
lamport_clock:incr(),
|
||||||
Proj = get_projection(OriginalProj),
|
Proj = get_projection(OriginalProj),
|
||||||
?LOG({append, Page},
|
?LOG3({append, Page},
|
||||||
try
|
try
|
||||||
|
corfurl_client:pulse_tracing_start(write),
|
||||||
{Res, Proj2} = corfurl_client:append_page(Proj, Page),
|
{Res, Proj2} = corfurl_client:append_page(Proj, Page),
|
||||||
put_projection(Proj2),
|
put_projection(Proj2),
|
||||||
|
OtherPages0 = lists:usort(corfurl_client:pulse_tracing_get(write)),
|
||||||
|
OtherPages = case Res of
|
||||||
|
{ok, LPN} ->
|
||||||
|
OtherPages0 -- [LPN];
|
||||||
|
_ ->
|
||||||
|
OtherPages0
|
||||||
|
end,
|
||||||
|
put(zzzOtherPages, OtherPages),
|
||||||
perhaps_trip_append_page(?TRIP_no_append_duplicates, Res, Page)
|
perhaps_trip_append_page(?TRIP_no_append_duplicates, Res, Page)
|
||||||
catch X:Y ->
|
catch X:Y ->
|
||||||
{caught, ?MODULE, ?LINE, X, Y, erlang:get_stacktrace()}
|
{caught, ?MODULE, ?LINE, X, Y, erlang:get_stacktrace()}
|
||||||
|
end,
|
||||||
|
try
|
||||||
|
OPages = get(zzzOtherPages),
|
||||||
|
%%if OPages /= [] -> io:format("OPages = ~w\n", [OPages]); true -> ok end,
|
||||||
|
GooPid = {self(), goo, now()},
|
||||||
|
[begin
|
||||||
|
event_logger:event(log_make_call(GooPid, {goo_write, OP, Page}),
|
||||||
|
LOG__Start),
|
||||||
|
event_logger:event(log_make_result(GooPid, who_knows),
|
||||||
|
LOG__End)
|
||||||
|
end || OP <- OPages]
|
||||||
|
catch XX:YY ->
|
||||||
|
exit({oops, ?MODULE, ?LINE, XX, YY, erlang:get_stacktrace()})
|
||||||
end).
|
end).
|
||||||
|
|
||||||
read_result_mangle({ok, Page}) ->
|
read_result_mangle({ok, Page}) ->
|
||||||
|
|
Loading…
Reference in a new issue