From fb1216649c0620636a922d054470e8c429647162 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Thu, 27 Feb 2014 01:36:54 +0900 Subject: [PATCH] Finish very basic PULSE testing of stopping & restarting the sequencer --- prototype/corfurl/rebar.config.script | 3 ++ prototype/corfurl/src/corfurl.erl | 9 +++- prototype/corfurl/src/corfurl_client.erl | 48 ++++++++++++++++++++- prototype/corfurl/src/corfurl_sequencer.erl | 1 - prototype/corfurl/src/corfurl_util.erl | 4 ++ prototype/corfurl/test/corfurl_pulse.erl | 25 ++++++----- prototype/corfurl/test/corfurl_test.erl | 9 ++-- 7 files changed, 76 insertions(+), 23 deletions(-) diff --git a/prototype/corfurl/rebar.config.script b/prototype/corfurl/rebar.config.script index 79df2a3..0eb68b7 100644 --- a/prototype/corfurl/rebar.config.script +++ b/prototype/corfurl/rebar.config.script @@ -18,6 +18,9 @@ case PulseBuild of , {corfurl_flu, trim, '_'} , {corfurl_flu, fill, '_'} + , {corfurl, read_projection, '_'} + , {corfurl, save_projection, '_'} + , {prim_file, '_', '_'} , {file, '_', '_'} , {filelib, '_', '_'} diff --git a/prototype/corfurl/src/corfurl.erl b/prototype/corfurl/src/corfurl.erl index 6820578..71ae149 100644 --- a/prototype/corfurl/src/corfurl.erl +++ b/prototype/corfurl/src/corfurl.erl @@ -320,8 +320,13 @@ save_projection(Dir, #proj{epoch=Epoch} = P) -> end. latest_projection_epoch_number(Dir) -> - {Epoch, _} = string:to_integer(lists:last(filelib:wildcard("*.proj", Dir))), - Epoch. + case filelib:wildcard("*.proj", Dir) of + [] -> + -1; + Files -> + {Epoch, _} = string:to_integer(lists:last(Files)), + Epoch + end. project_to_chain(LPN, P) -> %% TODO fixme diff --git a/prototype/corfurl/src/corfurl_client.erl b/prototype/corfurl/src/corfurl_client.erl index df01512..37805e7 100644 --- a/prototype/corfurl/src/corfurl_client.erl +++ b/prototype/corfurl/src/corfurl_client.erl @@ -39,6 +39,13 @@ append_page(#proj{seq={Sequencer,_,_}} = P, Page, Retries) case append_page2(P, LPN, Page) of lost_race -> append_page(P, Page, Retries - 1); + error_badepoch -> + case poll_for_new_epoch_projection(P) of + {ok, NewP} -> + append_page(NewP, Page, Retries-1); + Else -> + {Else, P} + end; Else -> {Else, P} end @@ -49,7 +56,9 @@ append_page(#proj{seq={Sequencer,_,_}} = P, Page, Retries) append_page(restart_sequencer(P), Page, Retries); exit:Exit -> {failed, incomplete_code, Exit} - end. + end; +append_page(P, _Page, _Retries) -> + {error_badepoch, P}. append_page2(P, LPN, Page) -> case corfurl:write_page(P, LPN, Page) of @@ -59,6 +68,8 @@ append_page2(P, LPN, Page) -> report_lost_race(LPN, X), lost_race; {special_trimmed, LPN}=XX -> + XX; + error_badepoch=XX-> XX %% Let it crash: error_unwritten end. @@ -72,10 +83,45 @@ restart_sequencer(#proj{seq={OldSequencer, _SeqHost, SeqName}, FLUs = lists:usort( [FLU || R <- Ranges, C <- tuple_to_list(R#range.chains), FLU <- C]), + %% TODO: We can proceed if we can seal at least one FLU in + %% each chain. Robustify and sanity check. + [begin + _Res = corfurl_flu:seal(FLU, Epoch) + end || FLU <- lists:reverse(FLUs)], + case get(goo) of undefined -> put(goo, 0); _Q -> ok end, case corfurl_sequencer:start_link(FLUs, TODO_type, SeqName) of {ok, Pid} -> NewP = P#proj{seq={Pid, node(), SeqName}, epoch=Epoch+1}, save_projection_or_get_latest(NewP) + %% case put(goo, get(goo) + 1) of + %% N when N < 2 -> + %% io:format(user, "hiiiiiiiiiiiiiiiiiiiiiiiiiiiii", []), + %% P#proj{seq={Pid, node(), SeqName}, epoch=Epoch}; + %% _ -> + %% save_projection_or_get_latest(NewP) + %% end + end. + +poll_for_new_epoch_projection(P) -> + poll_for_new_epoch_projection(P, 25). + +poll_for_new_epoch_projection(P, 0) -> + %% TODO: The client that caused the seal may have crashed before + %% writing a new projection. We should try to pick up here, + %% write a new projection, and bully forward. + case corfurl:latest_projection_epoch_number(P#proj.dir) of + Neg when Neg < 0 -> + error_badepoch; + Other -> + exit({bummer, ?MODULE, ?LINE, latest_epoch, Other}) + end; +poll_for_new_epoch_projection(#proj{dir=Dir, epoch=Epoch} = P, Tries) -> + case corfurl:latest_projection_epoch_number(Dir) of + NewEpoch when NewEpoch > Epoch -> + corfurl:read_projection(Dir, NewEpoch); + _ -> + timer:sleep(50), + poll_for_new_epoch_projection(P, Tries - 1) end. save_projection_or_get_latest(#proj{dir=Dir} = P) -> diff --git a/prototype/corfurl/src/corfurl_sequencer.erl b/prototype/corfurl/src/corfurl_sequencer.erl index 1f7a3d3..429e8d8 100644 --- a/prototype/corfurl/src/corfurl_sequencer.erl +++ b/prototype/corfurl/src/corfurl_sequencer.erl @@ -66,7 +66,6 @@ stop(Pid) -> stop(Pid, Method) -> Res = gen_server:call(Pid, stop, infinity), if Method == kill -> - io:format("stop(kill)"), %% Emulate gen.erl's client-side behavior when the server process %% is killed. exit(killed); diff --git a/prototype/corfurl/src/corfurl_util.erl b/prototype/corfurl/src/corfurl_util.erl index c88da23..7a69055 100644 --- a/prototype/corfurl/src/corfurl_util.erl +++ b/prototype/corfurl/src/corfurl_util.erl @@ -22,6 +22,10 @@ -export([delete_dir/1]). +-ifdef(PULSE). +-compile({parse_transform, pulse_instrument}). +-endif. + delete_dir(Dir) -> %% We don't recursively delete directories, the ok pattern match will fail. [ok = file:delete(X) || X <- filelib:wildcard(Dir ++ "/*")], diff --git a/prototype/corfurl/test/corfurl_pulse.erl b/prototype/corfurl/test/corfurl_pulse.erl index 5330080..f6ae41e 100644 --- a/prototype/corfurl/test/corfurl_pulse.erl +++ b/prototype/corfurl/test/corfurl_pulse.erl @@ -118,15 +118,15 @@ command(#state{run=Run} = S) -> || not S#state.is_setup] ++ [{50, {call, ?MODULE, append, [Run, gen_page(PageSize)]}} || S#state.is_setup] ++ - [{15, {call, ?MODULE, read_approx, [Run, gen_approx_page()]}} - || S#state.is_setup] ++ + %% [{15, {call, ?MODULE, read_approx, [Run, gen_approx_page()]}} + %% || S#state.is_setup] ++ %% [{15, {call, ?MODULE, scan_forward, [Run, gen_scan_forward_start(), nat()]}} %% || S#state.is_setup] ++ %% [{12, {call, ?MODULE, fill, [Run, gen_approx_page()]}} %% || S#state.is_setup] ++ %% [{12, {call, ?MODULE, trim, [Run, gen_approx_page()]}} %% || S#state.is_setup] ++ - [{ 1, {call, ?MODULE, stop_sequencer, [Run, gen_stop_method()]}} + [{10, {call, ?MODULE, stop_sequencer, [Run, gen_stop_method()]}} || S#state.is_setup] ++ [])). @@ -162,13 +162,13 @@ eqeq(X, Y) -> {X, '/=', Y}. postcondition(_S, {call, _, setup, _}, #run{} = _V) -> true; -postcondition(_S, {call, _, append, _}, {ok, LPN}) when is_integer(LPN) -> - true; -postcondition(_S, {call, _, append, _}, {special_trimmed, LPN}) - when is_integer(LPN) -> - true; postcondition(_S, {call, _, append, _}, V) -> - eqeq(V, todoTODO_fixit); + case V of + {ok, LPN} when is_integer(LPN) -> true; + {special_trimmed, LPN} when is_integer(LPN) -> true; + error_badepoch -> true; + _ -> eqeq(V, todoTODO_fixit) + end; postcondition(_S, {call, _, read_approx, _}, V) -> valid_read_result(V); postcondition(_S, {call, _, scan_forward, _}, V) -> @@ -736,10 +736,11 @@ append(#run{proj=OriginalProj}, Page) -> Proj = get_projection(OriginalProj), ?LOG({append, Page}, try - {Res, Proj2} = (catch corfurl_client:append_page(Proj, Page)), + {Res, Proj2} = corfurl_client:append_page(Proj, Page), put_projection(Proj2), perhaps_trip_append_page(?TRIP_no_append_duplicates, Res, Page) catch X:Y -> + io:format("APPEND ~p\n", [{error, append, X, Y, erlang:get_stacktrace()}]), {error, append, X, Y, erlang:get_stacktrace()} end). @@ -811,9 +812,7 @@ trim(#run{proj=OriginalProj}, SeedInt) -> LPN = pick_an_LPN(Proj, SeedInt), ?LOG({trim, LPN}, begin -io:format(user, "LPN = ~p\n", [LPN]), -io:format(user, "PROJ = ~p\n", [Proj]), - Res = (catch corfurl:trim_page(Proj, LPN)), + Res = corfurl:trim_page(Proj, LPN), perhaps_trip_trim_page(?TRIP_bad_trim, Res, LPN) end). diff --git a/prototype/corfurl/test/corfurl_test.erl b/prototype/corfurl/test/corfurl_test.erl index 0051471..45423eb 100644 --- a/prototype/corfurl/test/corfurl_test.erl +++ b/prototype/corfurl/test/corfurl_test.erl @@ -174,14 +174,11 @@ smoke_append_badepoch_test() -> [begin {{ok, LPN}, _} = corfurl_client:append_page(P1, Pg) end || {LPN, Pg} <- LPN_Pgs], [{ok, _} = corfurl_flu:seal(FLU, BigEpoch) || FLU <- FLUs], - [begin - {error_badepoch, _} = corfurl_client:append_page(P1, Pg) - end || {_LPN, Pg} <- LPN_Pgs], + {_LPN, Pg} = hd(LPN_Pgs), + {error_badepoch, _} = corfurl_client:append_page(P1, Pg), P2 = P1#proj{epoch=LittleEpoch}, - [begin - {error_badepoch, _} = corfurl_client:append_page(P2, Pg) - end || {_LPN, Pg} <- LPN_Pgs], + {error_badepoch, _} = corfurl_client:append_page(P2, Pg), ok after