Switch to Lamport clocks for PULSE verifying

This commit is contained in:
Scott Lystig Fritchie 2014-02-21 18:01:43 +09:00
parent 5420e9ca1f
commit a7dd78d8f1
5 changed files with 160 additions and 45 deletions

View file

@ -37,6 +37,7 @@
-ifdef(TEST).
-export([get__mlp/1, get__min_epoch/1, get__trim_watermark/1]).
-compile(export_all).
-ifdef(PULSE).
-compile({parse_transform, pulse_instrument}).
-endif.
@ -72,22 +73,28 @@ stop(Pid) ->
write(Pid, Epoch, LogicalPN, PageBin)
when is_integer(LogicalPN), LogicalPN > 0, is_binary(PageBin) ->
gen_server:call(Pid, {write, Epoch, LogicalPN, PageBin}, infinity).
g_call(Pid, {write, Epoch, LogicalPN, PageBin}, infinity).
read(Pid, Epoch, LogicalPN)
when is_integer(Epoch), Epoch > 0, is_integer(LogicalPN), LogicalPN > 0 ->
gen_server:call(Pid, {read, Epoch, LogicalPN}, infinity).
g_call(Pid, {read, Epoch, LogicalPN}, infinity).
seal(Pid, Epoch) when is_integer(Epoch), Epoch > 0 ->
gen_server:call(Pid, {seal, Epoch}, infinity).
g_call(Pid, {seal, Epoch}, infinity).
trim(Pid, Epoch, LogicalPN)
when is_integer(Epoch), Epoch > 0, is_integer(LogicalPN), LogicalPN > 0 ->
gen_server:call(Pid, {trim, Epoch, LogicalPN}, infinity).
g_call(Pid, {trim, Epoch, LogicalPN}, infinity).
fill(Pid, Epoch, LogicalPN)
when is_integer(Epoch), Epoch > 0, is_integer(LogicalPN), LogicalPN > 0 ->
gen_server:call(Pid, {fill, Epoch, LogicalPN}, infinity).
g_call(Pid, {fill, Epoch, LogicalPN}, infinity).
g_call(Pid, Arg, Timeout) ->
LC1 = lamport_clock:get(),
{Res, LC2} = gen_server:call(Pid, {Arg, LC1}, Timeout),
lamport_clock:update(LC2),
Res.
-ifdef(TEST).
@ -105,6 +112,8 @@ get__trim_watermark(Pid) ->
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
init({Dir, ExpPageSize, ExpMaxMem}) ->
lamport_clock:init(),
MemFile = memfile_path(Dir),
filelib:ensure_dir(MemFile),
{ok, FH} = file:open(MemFile, [read, write, raw, binary]),
@ -138,49 +147,61 @@ init({Dir, ExpPageSize, ExpMaxMem}) ->
handle_call(Call, From, #state{max_logical_page=unknown} = State) ->
{noreply, NewState} = handle_info(finish_init, State),
handle_call(Call, From, NewState);
handle_call({write, ClientEpoch, _LogicalPN, _PageBin}, _From,
handle_call({{write, ClientEpoch, _LogicalPN, _PageBin}, LC1}, _From,
#state{min_epoch=MinEpoch} = State)
when ClientEpoch < MinEpoch ->
{reply, error_badepoch, State};
handle_call({write, _ClientEpoch, LogicalPN, PageBin}, _From,
LC2 = lamport_clock:update(LC1),
{reply, {error_badepoch, LC2}, State};
handle_call({{write, _ClientEpoch, LogicalPN, PageBin}, LC1}, _From,
#state{max_logical_page=MLPN} = State) ->
LC2 = lamport_clock:update(LC1),
case check_write(LogicalPN, PageBin, State) of
{ok, Offset} ->
ok = write_page(Offset, LogicalPN, PageBin, State),
NewMLPN = erlang:max(LogicalPN, MLPN),
{reply, ok, State#state{max_logical_page=NewMLPN}};
{reply, {ok, LC2}, State#state{max_logical_page=NewMLPN}};
Else ->
{reply, Else, State}
{reply, {Else, LC2}, State}
end;
handle_call({read, ClientEpoch, _LogicalPN}, _From,
handle_call({{read, ClientEpoch, _LogicalPN}, LC1}, _From,
#state{min_epoch=MinEpoch} = State)
when ClientEpoch < MinEpoch ->
{reply, error_badepoch, State};
handle_call({read, _ClientEpoch, LogicalPN}, _From, State) ->
{reply, read_page(LogicalPN, State), State};
LC2 = lamport_clock:update(LC1),
{reply, {error_badepoch, LC2}, State};
handle_call({{read, _ClientEpoch, LogicalPN}, LC1}, _From, State) ->
LC2 = lamport_clock:update(LC1),
{reply, {read_page(LogicalPN, State), LC2}, State};
handle_call({seal, ClientEpoch}, _From, #state{min_epoch=MinEpoch} = State)
handle_call({{seal, ClientEpoch}, LC1}, _From, #state{min_epoch=MinEpoch} = State)
when ClientEpoch =< MinEpoch ->
{reply, error_badepoch, State};
handle_call({seal, ClientEpoch}, _From, #state{max_logical_page=MLPN}=State) ->
LC2 = lamport_clock:update(LC1),
{reply, {error_badepoch, LC2}, State};
handle_call({{seal, ClientEpoch}, LC1}, _From, #state{max_logical_page=MLPN}=State) ->
LC2 = lamport_clock:update(LC1),
NewState = State#state{min_epoch=ClientEpoch},
ok = write_hard_state(NewState),
{reply, {ok, MLPN}, NewState};
{reply, {{ok, MLPN}, LC2}, NewState};
handle_call({trim, ClientEpoch, _LogicalPN}, _From,
handle_call({{trim, ClientEpoch, _LogicalPN}, LC1}, _From,
#state{min_epoch=MinEpoch} = State)
when ClientEpoch < MinEpoch ->
{reply, error_badepoch, State};
handle_call({trim, _ClientEpoch, LogicalPN}, _From, State) ->
do_trim_or_fill(trim, LogicalPN, State);
LC2 = lamport_clock:update(LC1),
{reply, {error_badepoch, LC2}, State};
handle_call({{trim, _ClientEpoch, LogicalPN}, LC1}, _From, State) ->
LC2 = lamport_clock:update(LC1),
{Reply, NewState} = do_trim_or_fill(trim, LogicalPN, State),
{reply, {Reply, LC2}, NewState};
handle_call({fill, ClientEpoch, _LogicalPN}, _From,
handle_call({{fill, ClientEpoch, _LogicalPN}, LC1}, _From,
#state{min_epoch=MinEpoch} = State)
when ClientEpoch < MinEpoch ->
{reply, error_badepoch, State};
handle_call({fill, _ClientEpoch, LogicalPN}, _From, State) ->
do_trim_or_fill(fill, LogicalPN, State);
LC2 = lamport_clock:update(LC1),
{reply, {error_badepoch, LC2}, State};
handle_call({{fill, _ClientEpoch, LogicalPN}, LC1}, _From, State) ->
LC2 = lamport_clock:update(LC1),
{Reply, NewState} = do_trim_or_fill(fill, LogicalPN, State),
{reply, {Reply, LC2}, NewState};
handle_call(get__mlp, _From, State) ->
{reply, State#state.max_logical_page, State};
@ -347,9 +368,9 @@ do_trim_or_fill(Op, LogicalPN,
true ->
ok
end,
{reply, ok, NewS};
{ok, NewS};
Else ->
{reply, Else, S}
{Else, S}
end.
trim_page(Op, LogicalPN, #state{max_mem=MaxMem, mem_fh=FH} = S) ->

View file

@ -50,11 +50,15 @@ stop(Pid) ->
gen_server:call(Pid, stop, infinity).
get(Pid, NumPages) ->
gen_server:call(Pid, {get, NumPages}, infinity).
{LPN, LC} = gen_server:call(Pid, {get, NumPages, lamport_clock:get()},
infinity),
lamport_clock:update(LC),
LPN.
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
init({FLUs, TypeOrSeed}) ->
lamport_clock:init(),
MLP = get_max_logical_page(FLUs),
if TypeOrSeed == standard ->
{ok, MLP + 1};
@ -64,16 +68,19 @@ init({FLUs, TypeOrSeed}) ->
{ok, {MLP+1, BadPercent, MaxDifference}}
end.
handle_call({get, NumPages}, _From, MLP) when is_integer(MLP) ->
{reply, MLP, MLP + NumPages};
handle_call({get, NumPages}, _From, {MLP, BadPercent, MaxDifference}) ->
handle_call({get, NumPages, LC}, _From, MLP) when is_integer(MLP) ->
NewLC = lamport_clock:update(LC),
{reply, {MLP, NewLC}, MLP + NumPages};
handle_call({get, NumPages, LC}, _From, {MLP, BadPercent, MaxDifference}) ->
NewLC = lamport_clock:update(LC),
Fudge = case random:uniform(100) of
N when N < BadPercent ->
random:uniform(MaxDifference * 2) - MaxDifference;
_ ->
0
end,
{reply, erlang:max(1, MLP + Fudge), {MLP + NumPages, BadPercent, MaxDifference}};
{reply, {erlang:max(1, MLP + Fudge), NewLC},
{MLP + NumPages, BadPercent, MaxDifference}};
handle_call(stop, _From, MLP) ->
{stop, normal, ok, MLP};
handle_call(_Request, _From, MLP) ->
@ -87,6 +94,7 @@ handle_info(_Info, MLP) ->
{noreply, MLP}.
terminate(_Reason, _MLP) ->
%% io:format(user, "C=~w,", [lamport_clock:get()]),
ok.
code_change(_OldVsn, MLP, _Extra) ->

View file

@ -46,6 +46,8 @@
eqc:on_output(fun(Str, Args) -> ?QC_FMT(Str, Args) end, P)).
-define(MAX_PAGES, 50000).
-define(MY_TAB, i_have_a_name).
-define(MY_KEY, ?MY_TAB).
-record(run, {
seq, % Sequencer
@ -197,6 +199,8 @@ run_commands_on_node(LocalOrSlave, Cmds, Seed) ->
X =
try
{H, S, Res, Trace} = pulse:run(fun() ->
catch ets:new(?MY_TAB, [public, set, named_table]),
ets:insert(?MY_TAB, {?MY_KEY, undefined}),
%% application:start(my_test_app),
%% receive after AfterTime -> ok end,
{H, S, R} = run_parallel_commands(?MODULE, Cmds),
@ -206,13 +210,15 @@ run_commands_on_node(LocalOrSlave, Cmds, Seed) ->
receive after AfterTime -> ok end,
Trace = event_logger:get_events(),
%% receive after AfterTime -> ok end,
[{_, ThisRun}] = ets:lookup(?MY_TAB, ?MY_KEY),
[clean_up_runtime(ThisRun) || ThisRun /= undefined],
%% stop pulse controller *after* clean_up_runtime().
catch exit(pulse_application_controller, shutdown),
{H, S, R, Trace}
end, [{seed, Seed},
{strategy, unfair}]),
Schedule = pulse:get_schedule(),
Errors = gen_event:call(error_logger, handle_errors, get_errors, 60*1000),
[clean_up_runtime(S) || S#state.run /= undefined],
{H, S, Res, Trace, Schedule, Errors}
catch
_:Err ->
@ -283,7 +289,8 @@ check_trace(Trace0, _Cmds, _Seed) ->
%% Also, the append might fail, so the model can ignore those
%% failures because they're not mutating any state that and
%% external viewer can see.
Trace = add_LPN_to_append_calls(Trace0),
%% WARNING: Trace0 + lamport_clocks means Trace0 is not strictly sorted!
Trace = add_LPN_to_append_calls(lists:sort(Trace0)),
Events = eqc_temporal:from_timed_list(Trace),
%% Example Events, temporal style, 1 usec resolution, same as original trace
@ -470,6 +477,7 @@ check_trace(Trace0, _Cmds, _Seed) ->
end, [], FinaTtns_filtered),
?WHENFAIL(begin
?QC_FMT("*Trace: ~p\n", [Trace]),
?QC_FMT("*ModsReads: ~p\n", [eqc_temporal:unions([Mods,Reads])]),
?QC_FMT("*InvalidTtns: ~p\n", [InvalidTransitions]),
?QC_FMT("*BadReads: ~p\n", [BadReads])
@ -563,7 +571,7 @@ zipwith(F, [X|Xs], [Y|Ys]) ->
[F(X, Y)|zipwith(F, Xs, Ys)];
zipwith(_, _, _) -> [].
clean_up_runtime(#state{run=R} = _S) ->
clean_up_runtime(R) ->
%% io:format(user, "clean_up_runtime: run = ~p\n", [R]),
catch corfurl_sequencer:stop(R#run.seq),
[catch corfurl_flu:stop(F) || F <- R#run.flus],
@ -582,13 +590,16 @@ make_chains(ChainLen, [H|T], SmallAcc, BigAcc) ->
end.
setup(NumChains, ChainLen, PageSize, SeqType) ->
lamport_clock:init(),
N = NumChains * ChainLen,
FLUs = corfurl_test:setup_basic_flus(N, PageSize, ?MAX_PAGES),
{ok, Seq} = corfurl_sequencer:start_link(FLUs, SeqType),
Chains = make_chains(ChainLen, FLUs),
%% io:format(user, "Cs = ~p\n", [Chains]),
Proj = corfurl:new_simple_projection(1, 1, ?MAX_PAGES, Chains),
#run{seq=Seq, proj=Proj, flus=FLUs}.
Run = #run{seq=Seq, proj=Proj, flus=FLUs},
ets:insert(?MY_TAB, {?MY_KEY, Run}),
Run.
range_ify([]) ->
[];
@ -644,12 +655,14 @@ pick_an_LPN(Seq, SeedInt) ->
end.
-define(LOG(Tag, MkCall),
event_logger:event(log_make_call(Tag)),
event_logger:event(log_make_call(Tag), lamport_clock:get()),
LOG__Result = MkCall,
event_logger:event(log_make_result(LOG__Result)),
event_logger:event(log_make_result(LOG__Result), lamport_clock:get()),
LOG__Result).
append(#run{seq=Seq, proj=Proj}, Page) ->
lamport_clock:init(),
lamport_clock:incr(),
?LOG({append, Page},
begin
Res = corfurl:append_page(Seq, Proj, Page),
@ -662,6 +675,8 @@ read_result_mangle(Else) ->
Else.
read_approx(#run{seq=Seq, proj=Proj}, SeedInt) ->
lamport_clock:init(),
lamport_clock:incr(),
LPN = pick_an_LPN(Seq, SeedInt),
?LOG({read, LPN},
begin
@ -670,6 +685,8 @@ read_approx(#run{seq=Seq, proj=Proj}, SeedInt) ->
end).
scan_forward(#run{seq=Seq, proj=Proj}, SeedInt, NumPages) ->
lamport_clock:init(),
lamport_clock:incr(),
StartLPN = if SeedInt == 1 -> 1;
true -> pick_an_LPN(Seq, SeedInt)
end,
@ -679,11 +696,11 @@ scan_forward(#run{seq=Seq, proj=Proj}, SeedInt, NumPages) ->
%% instead from a single-page read_page() call.
?LOG({scan_forward, StartLPN, NumPages},
begin
TS1 = event_logger:timestamp(),
TS1 = lamport_clock:get(),
case corfurl:scan_forward(Proj, StartLPN, NumPages) of
{ok, EndLPN, MoreP, Pages} ->
PageIs = lists:zip(Pages, lists:seq(1, length(Pages))),
TS2 = event_logger:timestamp(),
TS2 = lamport_clock:get(),
[begin
PidI = {self(), s_f, I},
event_logger:event(log_make_call(PidI, {read, LPN}),
@ -700,6 +717,8 @@ scan_forward(#run{seq=Seq, proj=Proj}, SeedInt, NumPages) ->
end).
fill(#run{seq=Seq, proj=Proj}, SeedInt) ->
lamport_clock:init(),
lamport_clock:incr(),
LPN = pick_an_LPN(Seq, SeedInt),
?LOG({fill, LPN},
begin
@ -708,6 +727,8 @@ fill(#run{seq=Seq, proj=Proj}, SeedInt) ->
end).
trim(#run{seq=Seq, proj=Proj}, SeedInt) ->
lamport_clock:init(),
lamport_clock:incr(),
LPN = pick_an_LPN(Seq, SeedInt),
?LOG({trim, LPN},
begin

View file

@ -126,10 +126,8 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
add_event(#event{timestamp = Now, data = Data}, State) ->
Event = #event{ timestamp = Now - State#state.start_time, data = Data },
Event = #event{ timestamp = Now, data = Data },
State#state{ events = [Event|State#state.events] }.
timestamp() ->
{A, B, C} = erlang:now(),
1000000 * (1000000 * A + B) + C.
lamport_clock:get().

View file

@ -0,0 +1,67 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(lamport_clock).
-export([init/0, get/0, update/1, incr/0]).
-define(KEY, ?MODULE).
-ifdef(TEST).
init() ->
case get(?KEY) of
undefined ->
%% {Ca, Cb, _} = now(),
%% FakeTOD = ((Ca * 1000000) + Cb) * 1000000,
FakeTOD = 0,
put(?KEY, FakeTOD + 1);
N when is_integer(N) ->
ok
end.
get() ->
get(?KEY).
update(Remote) ->
New = erlang:max(get(?KEY), Remote) + 1,
put(?KEY, New),
New.
incr() ->
New = get(?KEY) + 1,
put(?KEY, New),
New.
-else. % TEST
init() ->
ok.
get() ->
ok.
update(_) ->
ok.
incr() ->
ok.
-endif. % TEST