Finish very basic PULSE testing of stopping & restarting the sequencer
This commit is contained in:
parent
63d1c93fc9
commit
fb1216649c
7 changed files with 76 additions and 23 deletions
|
@ -18,6 +18,9 @@ case PulseBuild of
|
|||
, {corfurl_flu, trim, '_'}
|
||||
, {corfurl_flu, fill, '_'}
|
||||
|
||||
, {corfurl, read_projection, '_'}
|
||||
, {corfurl, save_projection, '_'}
|
||||
|
||||
, {prim_file, '_', '_'}
|
||||
, {file, '_', '_'}
|
||||
, {filelib, '_', '_'}
|
||||
|
|
|
@ -320,8 +320,13 @@ save_projection(Dir, #proj{epoch=Epoch} = P) ->
|
|||
end.
|
||||
|
||||
latest_projection_epoch_number(Dir) ->
|
||||
{Epoch, _} = string:to_integer(lists:last(filelib:wildcard("*.proj", Dir))),
|
||||
Epoch.
|
||||
case filelib:wildcard("*.proj", Dir) of
|
||||
[] ->
|
||||
-1;
|
||||
Files ->
|
||||
{Epoch, _} = string:to_integer(lists:last(Files)),
|
||||
Epoch
|
||||
end.
|
||||
|
||||
project_to_chain(LPN, P) ->
|
||||
%% TODO fixme
|
||||
|
|
|
@ -39,6 +39,13 @@ append_page(#proj{seq={Sequencer,_,_}} = P, Page, Retries)
|
|||
case append_page2(P, LPN, Page) of
|
||||
lost_race ->
|
||||
append_page(P, Page, Retries - 1);
|
||||
error_badepoch ->
|
||||
case poll_for_new_epoch_projection(P) of
|
||||
{ok, NewP} ->
|
||||
append_page(NewP, Page, Retries-1);
|
||||
Else ->
|
||||
{Else, P}
|
||||
end;
|
||||
Else ->
|
||||
{Else, P}
|
||||
end
|
||||
|
@ -49,7 +56,9 @@ append_page(#proj{seq={Sequencer,_,_}} = P, Page, Retries)
|
|||
append_page(restart_sequencer(P), Page, Retries);
|
||||
exit:Exit ->
|
||||
{failed, incomplete_code, Exit}
|
||||
end.
|
||||
end;
|
||||
append_page(P, _Page, _Retries) ->
|
||||
{error_badepoch, P}.
|
||||
|
||||
append_page2(P, LPN, Page) ->
|
||||
case corfurl:write_page(P, LPN, Page) of
|
||||
|
@ -59,6 +68,8 @@ append_page2(P, LPN, Page) ->
|
|||
report_lost_race(LPN, X),
|
||||
lost_race;
|
||||
{special_trimmed, LPN}=XX ->
|
||||
XX;
|
||||
error_badepoch=XX->
|
||||
XX
|
||||
%% Let it crash: error_unwritten
|
||||
end.
|
||||
|
@ -72,10 +83,45 @@ restart_sequencer(#proj{seq={OldSequencer, _SeqHost, SeqName},
|
|||
FLUs = lists:usort(
|
||||
[FLU || R <- Ranges,
|
||||
C <- tuple_to_list(R#range.chains), FLU <- C]),
|
||||
%% TODO: We can proceed if we can seal at least one FLU in
|
||||
%% each chain. Robustify and sanity check.
|
||||
[begin
|
||||
_Res = corfurl_flu:seal(FLU, Epoch)
|
||||
end || FLU <- lists:reverse(FLUs)],
|
||||
case get(goo) of undefined -> put(goo, 0); _Q -> ok end,
|
||||
case corfurl_sequencer:start_link(FLUs, TODO_type, SeqName) of
|
||||
{ok, Pid} ->
|
||||
NewP = P#proj{seq={Pid, node(), SeqName}, epoch=Epoch+1},
|
||||
save_projection_or_get_latest(NewP)
|
||||
%% case put(goo, get(goo) + 1) of
|
||||
%% N when N < 2 ->
|
||||
%% io:format(user, "hiiiiiiiiiiiiiiiiiiiiiiiiiiiii", []),
|
||||
%% P#proj{seq={Pid, node(), SeqName}, epoch=Epoch};
|
||||
%% _ ->
|
||||
%% save_projection_or_get_latest(NewP)
|
||||
%% end
|
||||
end.
|
||||
|
||||
poll_for_new_epoch_projection(P) ->
|
||||
poll_for_new_epoch_projection(P, 25).
|
||||
|
||||
poll_for_new_epoch_projection(P, 0) ->
|
||||
%% TODO: The client that caused the seal may have crashed before
|
||||
%% writing a new projection. We should try to pick up here,
|
||||
%% write a new projection, and bully forward.
|
||||
case corfurl:latest_projection_epoch_number(P#proj.dir) of
|
||||
Neg when Neg < 0 ->
|
||||
error_badepoch;
|
||||
Other ->
|
||||
exit({bummer, ?MODULE, ?LINE, latest_epoch, Other})
|
||||
end;
|
||||
poll_for_new_epoch_projection(#proj{dir=Dir, epoch=Epoch} = P, Tries) ->
|
||||
case corfurl:latest_projection_epoch_number(Dir) of
|
||||
NewEpoch when NewEpoch > Epoch ->
|
||||
corfurl:read_projection(Dir, NewEpoch);
|
||||
_ ->
|
||||
timer:sleep(50),
|
||||
poll_for_new_epoch_projection(P, Tries - 1)
|
||||
end.
|
||||
|
||||
save_projection_or_get_latest(#proj{dir=Dir} = P) ->
|
||||
|
|
|
@ -66,7 +66,6 @@ stop(Pid) ->
|
|||
stop(Pid, Method) ->
|
||||
Res = gen_server:call(Pid, stop, infinity),
|
||||
if Method == kill ->
|
||||
io:format("stop(kill)"),
|
||||
%% Emulate gen.erl's client-side behavior when the server process
|
||||
%% is killed.
|
||||
exit(killed);
|
||||
|
|
|
@ -22,6 +22,10 @@
|
|||
|
||||
-export([delete_dir/1]).
|
||||
|
||||
-ifdef(PULSE).
|
||||
-compile({parse_transform, pulse_instrument}).
|
||||
-endif.
|
||||
|
||||
delete_dir(Dir) ->
|
||||
%% We don't recursively delete directories, the ok pattern match will fail.
|
||||
[ok = file:delete(X) || X <- filelib:wildcard(Dir ++ "/*")],
|
||||
|
|
|
@ -118,15 +118,15 @@ command(#state{run=Run} = S) ->
|
|||
|| not S#state.is_setup] ++
|
||||
[{50, {call, ?MODULE, append, [Run, gen_page(PageSize)]}}
|
||||
|| S#state.is_setup] ++
|
||||
[{15, {call, ?MODULE, read_approx, [Run, gen_approx_page()]}}
|
||||
|| S#state.is_setup] ++
|
||||
%% [{15, {call, ?MODULE, read_approx, [Run, gen_approx_page()]}}
|
||||
%% || S#state.is_setup] ++
|
||||
%% [{15, {call, ?MODULE, scan_forward, [Run, gen_scan_forward_start(), nat()]}}
|
||||
%% || S#state.is_setup] ++
|
||||
%% [{12, {call, ?MODULE, fill, [Run, gen_approx_page()]}}
|
||||
%% || S#state.is_setup] ++
|
||||
%% [{12, {call, ?MODULE, trim, [Run, gen_approx_page()]}}
|
||||
%% || S#state.is_setup] ++
|
||||
[{ 1, {call, ?MODULE, stop_sequencer, [Run, gen_stop_method()]}}
|
||||
[{10, {call, ?MODULE, stop_sequencer, [Run, gen_stop_method()]}}
|
||||
|| S#state.is_setup] ++
|
||||
[])).
|
||||
|
||||
|
@ -162,13 +162,13 @@ eqeq(X, Y) -> {X, '/=', Y}.
|
|||
|
||||
postcondition(_S, {call, _, setup, _}, #run{} = _V) ->
|
||||
true;
|
||||
postcondition(_S, {call, _, append, _}, {ok, LPN}) when is_integer(LPN) ->
|
||||
true;
|
||||
postcondition(_S, {call, _, append, _}, {special_trimmed, LPN})
|
||||
when is_integer(LPN) ->
|
||||
true;
|
||||
postcondition(_S, {call, _, append, _}, V) ->
|
||||
eqeq(V, todoTODO_fixit);
|
||||
case V of
|
||||
{ok, LPN} when is_integer(LPN) -> true;
|
||||
{special_trimmed, LPN} when is_integer(LPN) -> true;
|
||||
error_badepoch -> true;
|
||||
_ -> eqeq(V, todoTODO_fixit)
|
||||
end;
|
||||
postcondition(_S, {call, _, read_approx, _}, V) ->
|
||||
valid_read_result(V);
|
||||
postcondition(_S, {call, _, scan_forward, _}, V) ->
|
||||
|
@ -736,10 +736,11 @@ append(#run{proj=OriginalProj}, Page) ->
|
|||
Proj = get_projection(OriginalProj),
|
||||
?LOG({append, Page},
|
||||
try
|
||||
{Res, Proj2} = (catch corfurl_client:append_page(Proj, Page)),
|
||||
{Res, Proj2} = corfurl_client:append_page(Proj, Page),
|
||||
put_projection(Proj2),
|
||||
perhaps_trip_append_page(?TRIP_no_append_duplicates, Res, Page)
|
||||
catch X:Y ->
|
||||
io:format("APPEND ~p\n", [{error, append, X, Y, erlang:get_stacktrace()}]),
|
||||
{error, append, X, Y, erlang:get_stacktrace()}
|
||||
end).
|
||||
|
||||
|
@ -811,9 +812,7 @@ trim(#run{proj=OriginalProj}, SeedInt) ->
|
|||
LPN = pick_an_LPN(Proj, SeedInt),
|
||||
?LOG({trim, LPN},
|
||||
begin
|
||||
io:format(user, "LPN = ~p\n", [LPN]),
|
||||
io:format(user, "PROJ = ~p\n", [Proj]),
|
||||
Res = (catch corfurl:trim_page(Proj, LPN)),
|
||||
Res = corfurl:trim_page(Proj, LPN),
|
||||
perhaps_trip_trim_page(?TRIP_bad_trim, Res, LPN)
|
||||
end).
|
||||
|
||||
|
|
|
@ -174,14 +174,11 @@ smoke_append_badepoch_test() ->
|
|||
[begin {{ok, LPN}, _} = corfurl_client:append_page(P1, Pg) end || {LPN, Pg} <- LPN_Pgs],
|
||||
|
||||
[{ok, _} = corfurl_flu:seal(FLU, BigEpoch) || FLU <- FLUs],
|
||||
[begin
|
||||
{error_badepoch, _} = corfurl_client:append_page(P1, Pg)
|
||||
end || {_LPN, Pg} <- LPN_Pgs],
|
||||
{_LPN, Pg} = hd(LPN_Pgs),
|
||||
{error_badepoch, _} = corfurl_client:append_page(P1, Pg),
|
||||
|
||||
P2 = P1#proj{epoch=LittleEpoch},
|
||||
[begin
|
||||
{error_badepoch, _} = corfurl_client:append_page(P2, Pg)
|
||||
end || {_LPN, Pg} <- LPN_Pgs],
|
||||
{error_badepoch, _} = corfurl_client:append_page(P2, Pg),
|
||||
|
||||
ok
|
||||
after
|
||||
|
|
Loading…
Reference in a new issue