From 8b105672b1d03a216db5e95a18a1fdee15179885 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Sat, 1 Mar 2014 20:26:11 +0900 Subject: [PATCH] Bugfix for read-repair (thanks PULSE), model change to handle handle aborted writes --- prototype/corfurl/src/corfurl.erl | 21 +++++- prototype/corfurl/src/corfurl_client.erl | 33 +++++++++- prototype/corfurl/test/corfurl_pulse.erl | 84 ++++++++++++++++++++---- 3 files changed, 120 insertions(+), 18 deletions(-) diff --git a/prototype/corfurl/src/corfurl.erl b/prototype/corfurl/src/corfurl.erl index 71ae149..cfb8f82 100644 --- a/prototype/corfurl/src/corfurl.erl +++ b/prototype/corfurl/src/corfurl.erl @@ -129,7 +129,15 @@ ok_or_trim(error_trimmed) -> ok_or_trim(Else) -> Else. -read_repair_chain(Epoch, LPN, [Head|Rest] = Chain) -> +read_repair_chain(Epoch, LPN, Chain) -> + try + read_repair_chain1(Epoch, LPN, Chain) + catch + throw:{i_give_up,Res} -> + Res + end. + +read_repair_chain1(Epoch, LPN, [Head|Rest] = Chain) -> ?EVENT_LOG({read_repair, LPN, Chain, i_am, self()}), case corfurl_flu:read(flu_pid(Head), Epoch, LPN) of {ok, Page} -> @@ -153,10 +161,17 @@ read_repair_chain(Epoch, LPN, [Head|Rest] = Chain) -> Res2 = ok_or_trim(corfurl_flu:trim( flu_pid(X), Epoch, LPN)), ?EVENT_LOG({read_repair, LPN, fill, flu_pid(X), trim, Res2}), - Res2; + case Res2 of ok -> ok; + _ -> throw({i_give_up,Res2}) + end; Else -> + %% We're too deeply nested for the current code + %% to deal with, and we're racing. Fine, let + %% our opponent continue. We'll give up, and if + %% the client wants to try again, we can try + %% again from the top. ?EVENT_LOG({read_repair, LPN, fill, flu_pid(X), Else}), - Else + throw({i_give_up,Else}) end end || X <- Rest], error_trimmed; diff --git a/prototype/corfurl/src/corfurl_client.erl b/prototype/corfurl/src/corfurl_client.erl index d15ce9b..b875f07 100644 --- a/prototype/corfurl/src/corfurl_client.erl +++ b/prototype/corfurl/src/corfurl_client.erl @@ -22,6 +22,8 @@ -export([append_page/2, read_page/2, fill_page/2, trim_page/2, scan_forward/3]). -export([restart_sequencer/1]). +%% For debugging/verification only +-export([pulse_tracing_start/1, pulse_tracing_add/2, pulse_tracing_get/1]). -include("corfurl.hrl"). @@ -36,6 +38,7 @@ append_page(Proj, _Page, 0) -> append_page(#proj{seq={Sequencer,_,_}} = Proj, Page, Retries) -> try {ok, LPN} = corfurl_sequencer:get(Sequencer, 1), + pulse_tracing_add(write, LPN), append_page1(Proj, LPN, Page, 5) catch exit:{Reason,{_gen_server_or_pulse_gen_server,call,[Sequencer|_]}} @@ -217,7 +220,7 @@ report_lost_race(LPN, Reason) -> -ifdef(PULSE). get_poll_retries() -> - 9999*1000. + 999*1000. get_poll_sleep_time() -> 1. @@ -230,3 +233,31 @@ get_poll_sleep_time() -> 50. -endif. + +-ifdef(PULSE). + +pulse_tracing_start(Type) -> + put({?MODULE, Type}, []). + +pulse_tracing_add(Type, Stuff) -> + List = case pulse_tracing_get(Type) of + undefined -> []; + L -> L + end, + put({?MODULE, Type}, [Stuff|List]). + +pulse_tracing_get(Type) -> + get({?MODULE, Type}). + +-else. + +pulse_tracing_start(_Type) -> + ok. + +pulse_tracing_add(_Type, _Stuff) -> + ok. + +pulse_tracing_get(_Type) -> + ok. + +-endif. diff --git a/prototype/corfurl/test/corfurl_pulse.erl b/prototype/corfurl/test/corfurl_pulse.erl index 48491a7..d00c2d7 100644 --- a/prototype/corfurl/test/corfurl_pulse.erl +++ b/prototype/corfurl/test/corfurl_pulse.erl @@ -337,7 +337,8 @@ check_trace(Trace0, _Cmds, _Seed) -> ({call, _Pid, {append, _Pg, will_fail, {special_trimmed, LPN}}}) -> LPN; ({call, _Pid, {read, LPN, _, _}}) -> LPN; ({call, _Pid, {fill, LPN, will_be, ok}}) -> LPN; - ({call, _Pid, {trim, LPN, will_be, ok}}) -> LPN + ({call, _Pid, {trim, LPN, will_be, ok}}) -> LPN; + ({call, _Pid, {goo_write, LPN, _Pg}}) -> LPN end, fun(x) -> [] end, Calls), @@ -405,8 +406,7 @@ check_trace(Trace0, _Cmds, _Seed) -> InitialValDict = orddict:from_list([{LPN, [error_unwritten]} || LPN <- AllLPNs]), - {ValuesR, _} = - lists:mapfoldl( + ValuesRFun = fun({TS1, TS2, StEnds}, Dict1) -> Dict2 = lists:foldl( fun({mod_start, w_1, LPN, Pg}, D) -> @@ -440,7 +440,8 @@ check_trace(Trace0, _Cmds, _Seed) -> orddict:store(LPN, [Pg,error_trimmed], D) end, Dict2, [X || X={mod_end,_,_,_} <- StEnds]), {{TS1, TS2, [{values, Dict3}]}, Dict3} - end, InitialValDict, StartsDones), + end, + {ValuesR, _} = lists:mapfoldl(ValuesRFun, InitialValDict, StartsDones), InitialTtnDict = orddict:from_list([{LPN, [w_0]} || LPN <- AllLPNs]), {TransitionsR, _} = @@ -469,6 +470,14 @@ check_trace(Trace0, _Cmds, _Seed) -> %% Instead, we need to merge together all possible values from ValuesR %% that appear at any time during the read op's lifetime. + PerhapsR = eqc_temporal:stateful( + fun({call, _Pid, {goo_write, LPN, Pg}}) -> + {perhaps, LPN, Pg} + end, + fun(x)-> [] end, + Events), + {_, _, Perhaps} = lists:last(eqc_temporal:all_future(PerhapsR)), + %%?QC_FMT("*Perhaps: ~p\n", [Perhaps]), Reads = eqc_temporal:stateful( fun({call, Pid, {read, LPN, _, _}}) -> {read, Pid, LPN, []} @@ -483,10 +492,27 @@ check_trace(Trace0, _Cmds, _Seed) -> false = NewVs == V1s, {read, Pid, LPN, NewVs}; ({read, Pid, LPN, Vs}, {result, Pid, Pg}) -> + %% case lists:member(Pg, Vs) orelse + %% lists:member({perhaps, LPN, Pg}, Perhaps) of case lists:member(Pg, Vs) of - true -> []; - false -> [{bad, read, LPN, Pid, got, Pg, - possible, Vs}] + true -> + []; + false -> + case lists:member({perhaps, LPN, Pg}, Perhaps) of + true -> + %% The checking of the Perhaps list in + %% this manner is not strictly + %% temporally valid. It is possible + %% for the {perhaps,...} event to be + %% after the event we're checking here. + %% TODO work is to make this check 100% + %% temporally valid. + io:format(user, "Yo, found ~p ~p in Perhaps\n", [LPN, Pg]), + []; + false -> + [{bad, read, LPN, Pid, got, Pg, + possible, Vs}] + end end end, eqc_temporal:union(Events, ValuesR)), BadFilter = fun(bad) -> true; @@ -518,7 +544,8 @@ check_trace(Trace0, _Cmds, _Seed) -> ?QC_FMT("*InvalidTtns: ~p\n", [InvalidTransitions]), ?QC_FMT("*ValuesR: ~p\n", [eqc_temporal:unions([ValuesR, StartsDones])]), ?QC_FMT("*Calls: ~p\n", [Calls]), - ?QC_FMT("*BadReads: ~p\n", [BadReads]) + ?QC_FMT("*BadReads: ~p\n", [BadReads]), + ?QC_FMT("*Perhaps: ~p\n", [Perhaps]) end, conjunction( [ @@ -724,23 +751,52 @@ pick_an_LPN(#proj{seq={Seq,_,_}} = P, SeedInt) -> pick_an_LPN(corfurl_client:restart_sequencer(P), SeedInt) end. --define(LOG(Tag, MkCall), - event_logger:event(log_make_call(Tag), lamport_clock:get()), - LOG__Result = MkCall, - event_logger:event(log_make_result(LOG__Result), lamport_clock:get()), - LOG__Result). +-define(LOG3(Tag, MkCall, PostCall), + begin + LOG__Start = lamport_clock:get(), + event_logger:event(log_make_call(Tag), LOG__Start), + LOG__Result = MkCall, + LOG__End = lamport_clock:get(), + PostCall, + event_logger:event(log_make_result(LOG__Result), LOG__End), + LOG__Result + end). + +-define(LOG(Tag, MkCall), ?LOG3(Tag, MkCall, okqq)). append(#run{proj=OriginalProj}, Page) -> lamport_clock:init(), lamport_clock:incr(), Proj = get_projection(OriginalProj), - ?LOG({append, Page}, + ?LOG3({append, Page}, try + corfurl_client:pulse_tracing_start(write), {Res, Proj2} = corfurl_client:append_page(Proj, Page), put_projection(Proj2), + OtherPages0 = lists:usort(corfurl_client:pulse_tracing_get(write)), + OtherPages = case Res of + {ok, LPN} -> + OtherPages0 -- [LPN]; + _ -> + OtherPages0 + end, + put(zzzOtherPages, OtherPages), perhaps_trip_append_page(?TRIP_no_append_duplicates, Res, Page) catch X:Y -> {caught, ?MODULE, ?LINE, X, Y, erlang:get_stacktrace()} + end, + try + OPages = get(zzzOtherPages), + %%if OPages /= [] -> io:format("OPages = ~w\n", [OPages]); true -> ok end, + GooPid = {self(), goo, now()}, + [begin + event_logger:event(log_make_call(GooPid, {goo_write, OP, Page}), + LOG__Start), + event_logger:event(log_make_result(GooPid, who_knows), + LOG__End) + end || OP <- OPages] + catch XX:YY -> + exit({oops, ?MODULE, ?LINE, XX, YY, erlang:get_stacktrace()}) end). read_result_mangle({ok, Page}) ->