From a7dd78d8f1652ca9c5c7764878b9d45bd16d20bf Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Fri, 21 Feb 2014 18:01:43 +0900 Subject: [PATCH] Switch to Lamport clocks for PULSE verifying --- prototype/corfurl/src/corfurl_flu.erl | 77 ++++++++++++------- prototype/corfurl/src/corfurl_sequencer.erl | 18 +++-- prototype/corfurl/test/corfurl_pulse.erl | 37 +++++++-- .../corfurl/test/pulse_util/event_logger.erl | 6 +- .../corfurl/test/pulse_util/lamport_clock.erl | 67 ++++++++++++++++ 5 files changed, 160 insertions(+), 45 deletions(-) create mode 100644 prototype/corfurl/test/pulse_util/lamport_clock.erl diff --git a/prototype/corfurl/src/corfurl_flu.erl b/prototype/corfurl/src/corfurl_flu.erl index 1f0dcf6..ba370da 100644 --- a/prototype/corfurl/src/corfurl_flu.erl +++ b/prototype/corfurl/src/corfurl_flu.erl @@ -37,6 +37,7 @@ -ifdef(TEST). -export([get__mlp/1, get__min_epoch/1, get__trim_watermark/1]). +-compile(export_all). -ifdef(PULSE). -compile({parse_transform, pulse_instrument}). -endif. @@ -72,22 +73,28 @@ stop(Pid) -> write(Pid, Epoch, LogicalPN, PageBin) when is_integer(LogicalPN), LogicalPN > 0, is_binary(PageBin) -> - gen_server:call(Pid, {write, Epoch, LogicalPN, PageBin}, infinity). + g_call(Pid, {write, Epoch, LogicalPN, PageBin}, infinity). read(Pid, Epoch, LogicalPN) when is_integer(Epoch), Epoch > 0, is_integer(LogicalPN), LogicalPN > 0 -> - gen_server:call(Pid, {read, Epoch, LogicalPN}, infinity). + g_call(Pid, {read, Epoch, LogicalPN}, infinity). seal(Pid, Epoch) when is_integer(Epoch), Epoch > 0 -> - gen_server:call(Pid, {seal, Epoch}, infinity). + g_call(Pid, {seal, Epoch}, infinity). trim(Pid, Epoch, LogicalPN) when is_integer(Epoch), Epoch > 0, is_integer(LogicalPN), LogicalPN > 0 -> - gen_server:call(Pid, {trim, Epoch, LogicalPN}, infinity). + g_call(Pid, {trim, Epoch, LogicalPN}, infinity). fill(Pid, Epoch, LogicalPN) when is_integer(Epoch), Epoch > 0, is_integer(LogicalPN), LogicalPN > 0 -> - gen_server:call(Pid, {fill, Epoch, LogicalPN}, infinity). + g_call(Pid, {fill, Epoch, LogicalPN}, infinity). + +g_call(Pid, Arg, Timeout) -> + LC1 = lamport_clock:get(), + {Res, LC2} = gen_server:call(Pid, {Arg, LC1}, Timeout), + lamport_clock:update(LC2), + Res. -ifdef(TEST). @@ -105,6 +112,8 @@ get__trim_watermark(Pid) -> %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% init({Dir, ExpPageSize, ExpMaxMem}) -> + lamport_clock:init(), + MemFile = memfile_path(Dir), filelib:ensure_dir(MemFile), {ok, FH} = file:open(MemFile, [read, write, raw, binary]), @@ -138,49 +147,61 @@ init({Dir, ExpPageSize, ExpMaxMem}) -> handle_call(Call, From, #state{max_logical_page=unknown} = State) -> {noreply, NewState} = handle_info(finish_init, State), handle_call(Call, From, NewState); -handle_call({write, ClientEpoch, _LogicalPN, _PageBin}, _From, +handle_call({{write, ClientEpoch, _LogicalPN, _PageBin}, LC1}, _From, #state{min_epoch=MinEpoch} = State) when ClientEpoch < MinEpoch -> - {reply, error_badepoch, State}; -handle_call({write, _ClientEpoch, LogicalPN, PageBin}, _From, + LC2 = lamport_clock:update(LC1), + {reply, {error_badepoch, LC2}, State}; +handle_call({{write, _ClientEpoch, LogicalPN, PageBin}, LC1}, _From, #state{max_logical_page=MLPN} = State) -> + LC2 = lamport_clock:update(LC1), case check_write(LogicalPN, PageBin, State) of {ok, Offset} -> ok = write_page(Offset, LogicalPN, PageBin, State), NewMLPN = erlang:max(LogicalPN, MLPN), - {reply, ok, State#state{max_logical_page=NewMLPN}}; + {reply, {ok, LC2}, State#state{max_logical_page=NewMLPN}}; Else -> - {reply, Else, State} + {reply, {Else, LC2}, State} end; -handle_call({read, ClientEpoch, _LogicalPN}, _From, +handle_call({{read, ClientEpoch, _LogicalPN}, LC1}, _From, #state{min_epoch=MinEpoch} = State) when ClientEpoch < MinEpoch -> - {reply, error_badepoch, State}; -handle_call({read, _ClientEpoch, LogicalPN}, _From, State) -> - {reply, read_page(LogicalPN, State), State}; + LC2 = lamport_clock:update(LC1), + {reply, {error_badepoch, LC2}, State}; +handle_call({{read, _ClientEpoch, LogicalPN}, LC1}, _From, State) -> + LC2 = lamport_clock:update(LC1), + {reply, {read_page(LogicalPN, State), LC2}, State}; -handle_call({seal, ClientEpoch}, _From, #state{min_epoch=MinEpoch} = State) +handle_call({{seal, ClientEpoch}, LC1}, _From, #state{min_epoch=MinEpoch} = State) when ClientEpoch =< MinEpoch -> - {reply, error_badepoch, State}; -handle_call({seal, ClientEpoch}, _From, #state{max_logical_page=MLPN}=State) -> + LC2 = lamport_clock:update(LC1), + {reply, {error_badepoch, LC2}, State}; +handle_call({{seal, ClientEpoch}, LC1}, _From, #state{max_logical_page=MLPN}=State) -> + LC2 = lamport_clock:update(LC1), NewState = State#state{min_epoch=ClientEpoch}, ok = write_hard_state(NewState), - {reply, {ok, MLPN}, NewState}; + {reply, {{ok, MLPN}, LC2}, NewState}; -handle_call({trim, ClientEpoch, _LogicalPN}, _From, +handle_call({{trim, ClientEpoch, _LogicalPN}, LC1}, _From, #state{min_epoch=MinEpoch} = State) when ClientEpoch < MinEpoch -> - {reply, error_badepoch, State}; -handle_call({trim, _ClientEpoch, LogicalPN}, _From, State) -> - do_trim_or_fill(trim, LogicalPN, State); + LC2 = lamport_clock:update(LC1), + {reply, {error_badepoch, LC2}, State}; +handle_call({{trim, _ClientEpoch, LogicalPN}, LC1}, _From, State) -> + LC2 = lamport_clock:update(LC1), + {Reply, NewState} = do_trim_or_fill(trim, LogicalPN, State), + {reply, {Reply, LC2}, NewState}; -handle_call({fill, ClientEpoch, _LogicalPN}, _From, +handle_call({{fill, ClientEpoch, _LogicalPN}, LC1}, _From, #state{min_epoch=MinEpoch} = State) when ClientEpoch < MinEpoch -> - {reply, error_badepoch, State}; -handle_call({fill, _ClientEpoch, LogicalPN}, _From, State) -> - do_trim_or_fill(fill, LogicalPN, State); + LC2 = lamport_clock:update(LC1), + {reply, {error_badepoch, LC2}, State}; +handle_call({{fill, _ClientEpoch, LogicalPN}, LC1}, _From, State) -> + LC2 = lamport_clock:update(LC1), + {Reply, NewState} = do_trim_or_fill(fill, LogicalPN, State), + {reply, {Reply, LC2}, NewState}; handle_call(get__mlp, _From, State) -> {reply, State#state.max_logical_page, State}; @@ -347,9 +368,9 @@ do_trim_or_fill(Op, LogicalPN, true -> ok end, - {reply, ok, NewS}; + {ok, NewS}; Else -> - {reply, Else, S} + {Else, S} end. trim_page(Op, LogicalPN, #state{max_mem=MaxMem, mem_fh=FH} = S) -> diff --git a/prototype/corfurl/src/corfurl_sequencer.erl b/prototype/corfurl/src/corfurl_sequencer.erl index 8b41040..4f14e66 100644 --- a/prototype/corfurl/src/corfurl_sequencer.erl +++ b/prototype/corfurl/src/corfurl_sequencer.erl @@ -50,11 +50,15 @@ stop(Pid) -> gen_server:call(Pid, stop, infinity). get(Pid, NumPages) -> - gen_server:call(Pid, {get, NumPages}, infinity). + {LPN, LC} = gen_server:call(Pid, {get, NumPages, lamport_clock:get()}, + infinity), + lamport_clock:update(LC), + LPN. %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% init({FLUs, TypeOrSeed}) -> + lamport_clock:init(), MLP = get_max_logical_page(FLUs), if TypeOrSeed == standard -> {ok, MLP + 1}; @@ -64,16 +68,19 @@ init({FLUs, TypeOrSeed}) -> {ok, {MLP+1, BadPercent, MaxDifference}} end. -handle_call({get, NumPages}, _From, MLP) when is_integer(MLP) -> - {reply, MLP, MLP + NumPages}; -handle_call({get, NumPages}, _From, {MLP, BadPercent, MaxDifference}) -> +handle_call({get, NumPages, LC}, _From, MLP) when is_integer(MLP) -> + NewLC = lamport_clock:update(LC), + {reply, {MLP, NewLC}, MLP + NumPages}; +handle_call({get, NumPages, LC}, _From, {MLP, BadPercent, MaxDifference}) -> + NewLC = lamport_clock:update(LC), Fudge = case random:uniform(100) of N when N < BadPercent -> random:uniform(MaxDifference * 2) - MaxDifference; _ -> 0 end, - {reply, erlang:max(1, MLP + Fudge), {MLP + NumPages, BadPercent, MaxDifference}}; + {reply, {erlang:max(1, MLP + Fudge), NewLC}, + {MLP + NumPages, BadPercent, MaxDifference}}; handle_call(stop, _From, MLP) -> {stop, normal, ok, MLP}; handle_call(_Request, _From, MLP) -> @@ -87,6 +94,7 @@ handle_info(_Info, MLP) -> {noreply, MLP}. terminate(_Reason, _MLP) -> + %% io:format(user, "C=~w,", [lamport_clock:get()]), ok. code_change(_OldVsn, MLP, _Extra) -> diff --git a/prototype/corfurl/test/corfurl_pulse.erl b/prototype/corfurl/test/corfurl_pulse.erl index 7f05484..01ce1cc 100644 --- a/prototype/corfurl/test/corfurl_pulse.erl +++ b/prototype/corfurl/test/corfurl_pulse.erl @@ -46,6 +46,8 @@ eqc:on_output(fun(Str, Args) -> ?QC_FMT(Str, Args) end, P)). -define(MAX_PAGES, 50000). +-define(MY_TAB, i_have_a_name). +-define(MY_KEY, ?MY_TAB). -record(run, { seq, % Sequencer @@ -197,6 +199,8 @@ run_commands_on_node(LocalOrSlave, Cmds, Seed) -> X = try {H, S, Res, Trace} = pulse:run(fun() -> + catch ets:new(?MY_TAB, [public, set, named_table]), + ets:insert(?MY_TAB, {?MY_KEY, undefined}), %% application:start(my_test_app), %% receive after AfterTime -> ok end, {H, S, R} = run_parallel_commands(?MODULE, Cmds), @@ -206,13 +210,15 @@ run_commands_on_node(LocalOrSlave, Cmds, Seed) -> receive after AfterTime -> ok end, Trace = event_logger:get_events(), %% receive after AfterTime -> ok end, + [{_, ThisRun}] = ets:lookup(?MY_TAB, ?MY_KEY), + [clean_up_runtime(ThisRun) || ThisRun /= undefined], + %% stop pulse controller *after* clean_up_runtime(). catch exit(pulse_application_controller, shutdown), {H, S, R, Trace} end, [{seed, Seed}, {strategy, unfair}]), Schedule = pulse:get_schedule(), Errors = gen_event:call(error_logger, handle_errors, get_errors, 60*1000), - [clean_up_runtime(S) || S#state.run /= undefined], {H, S, Res, Trace, Schedule, Errors} catch _:Err -> @@ -283,7 +289,8 @@ check_trace(Trace0, _Cmds, _Seed) -> %% Also, the append might fail, so the model can ignore those %% failures because they're not mutating any state that and %% external viewer can see. - Trace = add_LPN_to_append_calls(Trace0), + %% WARNING: Trace0 + lamport_clocks means Trace0 is not strictly sorted! + Trace = add_LPN_to_append_calls(lists:sort(Trace0)), Events = eqc_temporal:from_timed_list(Trace), %% Example Events, temporal style, 1 usec resolution, same as original trace @@ -470,6 +477,7 @@ check_trace(Trace0, _Cmds, _Seed) -> end, [], FinaTtns_filtered), ?WHENFAIL(begin + ?QC_FMT("*Trace: ~p\n", [Trace]), ?QC_FMT("*ModsReads: ~p\n", [eqc_temporal:unions([Mods,Reads])]), ?QC_FMT("*InvalidTtns: ~p\n", [InvalidTransitions]), ?QC_FMT("*BadReads: ~p\n", [BadReads]) @@ -563,7 +571,7 @@ zipwith(F, [X|Xs], [Y|Ys]) -> [F(X, Y)|zipwith(F, Xs, Ys)]; zipwith(_, _, _) -> []. -clean_up_runtime(#state{run=R} = _S) -> +clean_up_runtime(R) -> %% io:format(user, "clean_up_runtime: run = ~p\n", [R]), catch corfurl_sequencer:stop(R#run.seq), [catch corfurl_flu:stop(F) || F <- R#run.flus], @@ -582,13 +590,16 @@ make_chains(ChainLen, [H|T], SmallAcc, BigAcc) -> end. setup(NumChains, ChainLen, PageSize, SeqType) -> + lamport_clock:init(), N = NumChains * ChainLen, FLUs = corfurl_test:setup_basic_flus(N, PageSize, ?MAX_PAGES), {ok, Seq} = corfurl_sequencer:start_link(FLUs, SeqType), Chains = make_chains(ChainLen, FLUs), %% io:format(user, "Cs = ~p\n", [Chains]), Proj = corfurl:new_simple_projection(1, 1, ?MAX_PAGES, Chains), - #run{seq=Seq, proj=Proj, flus=FLUs}. + Run = #run{seq=Seq, proj=Proj, flus=FLUs}, + ets:insert(?MY_TAB, {?MY_KEY, Run}), + Run. range_ify([]) -> []; @@ -644,12 +655,14 @@ pick_an_LPN(Seq, SeedInt) -> end. -define(LOG(Tag, MkCall), - event_logger:event(log_make_call(Tag)), + event_logger:event(log_make_call(Tag), lamport_clock:get()), LOG__Result = MkCall, - event_logger:event(log_make_result(LOG__Result)), + event_logger:event(log_make_result(LOG__Result), lamport_clock:get()), LOG__Result). append(#run{seq=Seq, proj=Proj}, Page) -> + lamport_clock:init(), + lamport_clock:incr(), ?LOG({append, Page}, begin Res = corfurl:append_page(Seq, Proj, Page), @@ -662,6 +675,8 @@ read_result_mangle(Else) -> Else. read_approx(#run{seq=Seq, proj=Proj}, SeedInt) -> + lamport_clock:init(), + lamport_clock:incr(), LPN = pick_an_LPN(Seq, SeedInt), ?LOG({read, LPN}, begin @@ -670,6 +685,8 @@ read_approx(#run{seq=Seq, proj=Proj}, SeedInt) -> end). scan_forward(#run{seq=Seq, proj=Proj}, SeedInt, NumPages) -> + lamport_clock:init(), + lamport_clock:incr(), StartLPN = if SeedInt == 1 -> 1; true -> pick_an_LPN(Seq, SeedInt) end, @@ -679,11 +696,11 @@ scan_forward(#run{seq=Seq, proj=Proj}, SeedInt, NumPages) -> %% instead from a single-page read_page() call. ?LOG({scan_forward, StartLPN, NumPages}, begin - TS1 = event_logger:timestamp(), + TS1 = lamport_clock:get(), case corfurl:scan_forward(Proj, StartLPN, NumPages) of {ok, EndLPN, MoreP, Pages} -> PageIs = lists:zip(Pages, lists:seq(1, length(Pages))), - TS2 = event_logger:timestamp(), + TS2 = lamport_clock:get(), [begin PidI = {self(), s_f, I}, event_logger:event(log_make_call(PidI, {read, LPN}), @@ -700,6 +717,8 @@ scan_forward(#run{seq=Seq, proj=Proj}, SeedInt, NumPages) -> end). fill(#run{seq=Seq, proj=Proj}, SeedInt) -> + lamport_clock:init(), + lamport_clock:incr(), LPN = pick_an_LPN(Seq, SeedInt), ?LOG({fill, LPN}, begin @@ -708,6 +727,8 @@ fill(#run{seq=Seq, proj=Proj}, SeedInt) -> end). trim(#run{seq=Seq, proj=Proj}, SeedInt) -> + lamport_clock:init(), + lamport_clock:incr(), LPN = pick_an_LPN(Seq, SeedInt), ?LOG({trim, LPN}, begin diff --git a/prototype/corfurl/test/pulse_util/event_logger.erl b/prototype/corfurl/test/pulse_util/event_logger.erl index 063ed70..8633b99 100644 --- a/prototype/corfurl/test/pulse_util/event_logger.erl +++ b/prototype/corfurl/test/pulse_util/event_logger.erl @@ -126,10 +126,8 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- add_event(#event{timestamp = Now, data = Data}, State) -> - Event = #event{ timestamp = Now - State#state.start_time, data = Data }, + Event = #event{ timestamp = Now, data = Data }, State#state{ events = [Event|State#state.events] }. timestamp() -> - {A, B, C} = erlang:now(), - 1000000 * (1000000 * A + B) + C. - + lamport_clock:get(). diff --git a/prototype/corfurl/test/pulse_util/lamport_clock.erl b/prototype/corfurl/test/pulse_util/lamport_clock.erl new file mode 100644 index 0000000..65878be --- /dev/null +++ b/prototype/corfurl/test/pulse_util/lamport_clock.erl @@ -0,0 +1,67 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(lamport_clock). + +-export([init/0, get/0, update/1, incr/0]). + +-define(KEY, ?MODULE). + +-ifdef(TEST). + +init() -> + case get(?KEY) of + undefined -> + %% {Ca, Cb, _} = now(), + %% FakeTOD = ((Ca * 1000000) + Cb) * 1000000, + FakeTOD = 0, + put(?KEY, FakeTOD + 1); + N when is_integer(N) -> + ok + end. + +get() -> + get(?KEY). + +update(Remote) -> + New = erlang:max(get(?KEY), Remote) + 1, + put(?KEY, New), + New. + +incr() -> + New = get(?KEY) + 1, + put(?KEY, New), + New. + +-else. % TEST + +init() -> + ok. + +get() -> + ok. + +update(_) -> + ok. + +incr() -> + ok. + +-endif. % TEST