PULSE test now uses corfurl_client (retry logic) for all ops

This commit is contained in:
Scott Lystig Fritchie 2014-02-27 14:36:07 +09:00
parent 7ac1e7f178
commit 40c28b79bb
2 changed files with 55 additions and 24 deletions

View file

@ -20,7 +20,7 @@
-module(corfurl_client). -module(corfurl_client).
-export([append_page/2, read_page/2, fill_page/2, trim_page/2]). -export([append_page/2, read_page/2, fill_page/2, trim_page/2, scan_forward/3]).
-export([restart_sequencer/1]). -export([restart_sequencer/1]).
-include("corfurl.hrl"). -include("corfurl.hrl").
@ -76,11 +76,30 @@ append_page2(Proj, LPN, Page) ->
read_page(Proj, LPN) -> read_page(Proj, LPN) ->
retry_loop(Proj, fun(P) -> corfurl:read_page(P, LPN) end, 10). retry_loop(Proj, fun(P) -> corfurl:read_page(P, LPN) end, 10).
fill_page(Proj, LPN) ->
retry_loop(Proj, fun(P) -> corfurl:fill_page(P, LPN) end, 10).
trim_page(Proj, LPN) -> trim_page(Proj, LPN) ->
retry_loop(Proj, fun(P) -> corfurl:trim_page(P, LPN) end, 10). retry_loop(Proj, fun(P) -> corfurl:trim_page(P, LPN) end, 10).
fill_page(Proj, LPN) -> scan_forward(Proj, LPN, MaxPages) ->
retry_loop(Proj, fun(P) -> corfurl:fill_page(P, LPN) end, 10). %% This is fiddly stuff that I'll get 0.7% wrong if I try to be clever.
%% So, do something simple and (I hope) obviously correct.
%% TODO: do something "smarter".
case corfurl:scan_forward(Proj, LPN, MaxPages) of
{error_badepoch, _LPN2, _MoreP, _Pages} = Res ->
case poll_for_new_epoch_projection(Proj) of
{ok, NewProj} ->
{Res, NewProj};
_Else ->
%% TODO: What is the risk of getting caught in a situation
%% where we can never make any forward progress when pages
%% really are being written?
{Res, Proj}
end;
Res ->
{Res, Proj}
end.
%%%%% %%%%% %%%%% %%%%% %%%%% %%%%% %%%%% %%%%% %%%%% %%%%% %%%%% %%%%% %%%%% %%%%% %%%%% %%%%% %%%%% %%%%%

View file

@ -118,14 +118,14 @@ command(#state{run=Run} = S) ->
|| not S#state.is_setup] ++ || not S#state.is_setup] ++
[{50, {call, ?MODULE, append, [Run, gen_page(PageSize)]}} [{50, {call, ?MODULE, append, [Run, gen_page(PageSize)]}}
|| S#state.is_setup] ++ || S#state.is_setup] ++
%% [{15, {call, ?MODULE, read_approx, [Run, gen_approx_page()]}} [{15, {call, ?MODULE, read_approx, [Run, gen_approx_page()]}}
%% || S#state.is_setup] ++ || S#state.is_setup] ++
%% [{15, {call, ?MODULE, scan_forward, [Run, gen_scan_forward_start(), nat()]}} [{15, {call, ?MODULE, scan_forward, [Run, gen_scan_forward_start(), nat()]}}
%% || S#state.is_setup] ++ || S#state.is_setup] ++
%% [{12, {call, ?MODULE, fill, [Run, gen_approx_page()]}} [{12, {call, ?MODULE, fill, [Run, gen_approx_page()]}}
%% || S#state.is_setup] ++ || S#state.is_setup] ++
%% [{12, {call, ?MODULE, trim, [Run, gen_approx_page()]}} [{12, {call, ?MODULE, trim, [Run, gen_approx_page()]}}
%% || S#state.is_setup] ++ || S#state.is_setup] ++
[{10, {call, ?MODULE, stop_sequencer, [Run, gen_stop_method()]}} [{10, {call, ?MODULE, stop_sequencer, [Run, gen_stop_method()]}}
|| S#state.is_setup] ++ || S#state.is_setup] ++
[])). [])).
@ -740,8 +740,7 @@ append(#run{proj=OriginalProj}, Page) ->
put_projection(Proj2), put_projection(Proj2),
perhaps_trip_append_page(?TRIP_no_append_duplicates, Res, Page) perhaps_trip_append_page(?TRIP_no_append_duplicates, Res, Page)
catch X:Y -> catch X:Y ->
io:format("APPEND ~p\n", [{error, append, X, Y, erlang:get_stacktrace()}]), {caught, ?MODULE, ?LINE, X, Y, erlang:get_stacktrace()}
{error, append, X, Y, erlang:get_stacktrace()}
end). end).
read_result_mangle({ok, Page}) -> read_result_mangle({ok, Page}) ->
@ -755,10 +754,13 @@ read_approx(#run{proj=OriginalProj}, SeedInt) ->
Proj = get_projection(OriginalProj), Proj = get_projection(OriginalProj),
LPN = pick_an_LPN(Proj, SeedInt), LPN = pick_an_LPN(Proj, SeedInt),
?LOG({read, LPN}, ?LOG({read, LPN},
begin try
Res = read_result_mangle(corfurl:read_page(Proj, LPN)), {Res, Proj2} = corfurl_client:read_page(Proj, LPN),
put_projection(Proj), put_projection(Proj2),
perhaps_trip_read_approx(?TRIP_bad_read, Res, LPN) Res2 = read_result_mangle(Res),
perhaps_trip_read_approx(?TRIP_bad_read, Res2, LPN)
catch X:Y ->
{caught, ?MODULE, ?LINE, X, Y, erlang:get_stacktrace()}
end). end).
scan_forward(#run{proj=OriginalProj}, SeedInt, NumPages) -> scan_forward(#run{proj=OriginalProj}, SeedInt, NumPages) ->
@ -773,10 +775,12 @@ scan_forward(#run{proj=OriginalProj}, SeedInt, NumPages) ->
%% it appear as if each LPN result that scan_forward() gives us came %% it appear as if each LPN result that scan_forward() gives us came
%% instead from a single-page read_page() call. %% instead from a single-page read_page() call.
?LOG({scan_forward, StartLPN, NumPages}, ?LOG({scan_forward, StartLPN, NumPages},
begin try
TS1 = lamport_clock:get(), TS1 = lamport_clock:get(),
case corfurl:scan_forward(Proj, StartLPN, NumPages) of case corfurl_client:scan_forward(Proj, StartLPN, NumPages) of
{ok, EndLPN, MoreP, Pages} -> {{Res, EndLPN, MoreP, Pages}, Proj2}
when Res == ok; Res == error_badepoch ->
put_projection(Proj2),
PageIs = lists:zip(Pages, lists:seq(1, length(Pages))), PageIs = lists:zip(Pages, lists:seq(1, length(Pages))),
TS2 = lamport_clock:get(), TS2 = lamport_clock:get(),
[begin [begin
@ -792,6 +796,8 @@ scan_forward(#run{proj=OriginalProj}, SeedInt, NumPages) ->
{LPN, P} <- Pages], {LPN, P} <- Pages],
{ok, EndLPN, MoreP, Ps} {ok, EndLPN, MoreP, Ps}
end end
catch X:Y ->
{caught, ?MODULE, ?LINE, X, Y, erlang:get_stacktrace()}
end). end).
fill(#run{proj=OriginalProj}, SeedInt) -> fill(#run{proj=OriginalProj}, SeedInt) ->
@ -800,9 +806,12 @@ fill(#run{proj=OriginalProj}, SeedInt) ->
Proj = get_projection(OriginalProj), Proj = get_projection(OriginalProj),
LPN = pick_an_LPN(Proj, SeedInt), LPN = pick_an_LPN(Proj, SeedInt),
?LOG({fill, LPN}, ?LOG({fill, LPN},
begin try
Res = corfurl:fill_page(Proj, LPN), {Res, Proj2} = corfurl_client:fill_page(Proj, LPN),
put_projection(Proj2),
perhaps_trip_fill_page(?TRIP_bad_fill, Res, LPN) perhaps_trip_fill_page(?TRIP_bad_fill, Res, LPN)
catch X:Y ->
{caught, ?MODULE, ?LINE, X, Y, erlang:get_stacktrace()}
end). end).
trim(#run{proj=OriginalProj}, SeedInt) -> trim(#run{proj=OriginalProj}, SeedInt) ->
@ -811,9 +820,12 @@ trim(#run{proj=OriginalProj}, SeedInt) ->
Proj = get_projection(OriginalProj), Proj = get_projection(OriginalProj),
LPN = pick_an_LPN(Proj, SeedInt), LPN = pick_an_LPN(Proj, SeedInt),
?LOG({trim, LPN}, ?LOG({trim, LPN},
begin try
Res = corfurl:trim_page(Proj, LPN), {Res, Proj2} = corfurl_client:trim_page(Proj, LPN),
put_projection(Proj2),
perhaps_trip_trim_page(?TRIP_bad_trim, Res, LPN) perhaps_trip_trim_page(?TRIP_bad_trim, Res, LPN)
catch X:Y ->
{caught, ?MODULE, ?LINE, X, Y, erlang:get_stacktrace()}
end). end).
stop_sequencer(#run{proj=OriginalProj}, Method) -> stop_sequencer(#run{proj=OriginalProj}, Method) ->