WIP: eunit tests pass, but Concuerror loops forever then errs on max retries on proj0_test

This commit is contained in:
Scott Lystig Fritchie 2014-09-28 23:33:22 +09:00
parent f7447e8953
commit 97c5789b44
4 changed files with 244 additions and 78 deletions

View file

@ -0,0 +1,6 @@
-record(proj, { % Projection
epoch :: non_neg_integer(),
all :: list(pid()),
active :: list(pid())
}).

View file

@ -2,9 +2,13 @@
-behaviour(gen_server).
-include("machi.hrl").
-export([start_link/1, stop/1,
write/3, read/2, trim/2,
proj_write/3, proj_read/2, proj_get_latest_num/1, proj_read_latest/1]).
-export([make_proj/1, make_proj/2]).
-ifdef(TEST).
-compile(export_all).
-endif.
@ -31,7 +35,7 @@
name :: list(),
wedged = false :: boolean(),
register = 'unwritten' :: register(),
proj_num :: non_neg_integer(),
proj_epoch :: non_neg_integer(),
proj_store :: dict()
}).
@ -41,20 +45,20 @@ start_link(Name) when is_list(Name) ->
stop(Pid) ->
g_call(Pid, stop, infinity).
read(Pid, ProjNum) ->
g_call(Pid, {reg_op, ProjNum, read}, ?LONG_TIME).
read(Pid, Epoch) ->
g_call(Pid, {reg_op, Epoch, read}, ?LONG_TIME).
write(Pid, ProjNum, Bin) when is_binary(Bin) ->
g_call(Pid, {reg_op, ProjNum, {write, Bin}}, ?LONG_TIME).
write(Pid, Epoch, Bin) ->
g_call(Pid, {reg_op, Epoch, {write, Bin}}, ?LONG_TIME).
trim(Pid, ProjNum) ->
g_call(Pid, {reg_op, ProjNum, trim}, ?LONG_TIME).
trim(Pid, Epoch) ->
g_call(Pid, {reg_op, Epoch, trim}, ?LONG_TIME).
proj_write(Pid, ProjNum, Proj) ->
g_call(Pid, {proj_write, ProjNum, Proj}, ?LONG_TIME).
proj_write(Pid, Epoch, Proj) ->
g_call(Pid, {proj_write, Epoch, Proj}, ?LONG_TIME).
proj_read(Pid, ProjNum) ->
g_call(Pid, {proj_read, ProjNum}, ?LONG_TIME).
proj_read(Pid, Epoch) ->
g_call(Pid, {proj_read, Epoch}, ?LONG_TIME).
proj_get_latest_num(Pid) ->
g_call(Pid, {proj_get_latest_num}, ?LONG_TIME).
@ -70,62 +74,70 @@ g_call(Pid, Arg, Timeout) ->
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
make_proj(FLUs) ->
make_proj(1, FLUs).
make_proj(Epoch, FLUs) ->
#proj{epoch=Epoch, all=FLUs, active=FLUs}.
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
init([Name]) ->
lclock_init(),
{ok, #state{name=Name,
proj_num=-42,
proj_epoch=-42,
proj_store=orddict:new()}}.
handle_call({{reg_op, _ProjNum, _}, LC1}, _From, #state{wedged=true} = S) ->
handle_call({{reg_op, _Epoch, _}, LC1}, _From, #state{wedged=true} = S) ->
LC2 = lclock_update(LC1),
{reply, {error_wedged, LC2}, S};
handle_call({{reg_op, ProjNum, _}, LC1}, _From, #state{proj_num=MyProjNum} = S)
when ProjNum < MyProjNum ->
handle_call({{reg_op, Epoch, _}, LC1}, _From, #state{proj_epoch=MyEpoch} = S)
when Epoch < MyEpoch ->
LC2 = lclock_update(LC1),
{reply, {{error_stale_projection, MyProjNum}, LC2}, S};
handle_call({{reg_op, ProjNum, _}, LC1}, _From, #state{proj_num=MyProjNum} = S)
when ProjNum > MyProjNum ->
{reply, {{error_stale_projection, MyEpoch}, LC2}, S};
handle_call({{reg_op, Epoch, _}, LC1}, _From, #state{proj_epoch=MyEpoch} = S)
when Epoch > MyEpoch ->
LC2 = lclock_update(LC1),
{reply, {error_wedged, LC2}, S#state{wedged=true}};
handle_call({{reg_op, _ProjNum, {write, Bin}}, LC1}, _From,
handle_call({{reg_op, _Epoch, {write, Bin}}, LC1}, _From,
#state{register=unwritten} = S) ->
LC2 = lclock_update(LC1),
{reply, {ok, LC2}, S#state{register=Bin}};
handle_call({{reg_op, _ProjNum, {write, _Bin}}, LC1}, _From,
handle_call({{reg_op, _Epoch, {write, _Bin}}, LC1}, _From,
#state{register=B} = S) when is_binary(B) ->
LC2 = lclock_update(LC1),
{reply, {error_written, LC2}, S};
handle_call({{reg_op, _ProjNum, {write, _Bin}}, LC1}, _From,
handle_call({{reg_op, _Epoch, {write, _Bin}}, LC1}, _From,
#state{register=trimmed} = S) ->
LC2 = lclock_update(LC1),
{reply, {error_trimmed, LC2}, S};
handle_call({{reg_op, ProjNum, read}, LC1}, _From, #state{proj_num=MyProjNum} = S)
when ProjNum /= MyProjNum ->
handle_call({{reg_op, Epoch, read}, LC1}, _From, #state{proj_epoch=MyEpoch} = S)
when Epoch /= MyEpoch ->
LC2 = lclock_update(LC1),
{reply, {{error_stale_projection, MyProjNum}, LC2}, S};
handle_call({{reg_op, _ProjNum, read}, LC1}, _From, #state{register=Reg} = S) ->
{reply, {{error_stale_projection, MyEpoch}, LC2}, S};
handle_call({{reg_op, _Epoch, read}, LC1}, _From, #state{register=Reg} = S) ->
LC2 = lclock_update(LC1),
{reply, {{ok, Reg}, LC2}, S};
handle_call({{reg_op, _ProjNum, trim}, LC1}, _From, #state{register=unwritten} = S) ->
handle_call({{reg_op, _Epoch, trim}, LC1}, _From, #state{register=unwritten} = S) ->
LC2 = lclock_update(LC1),
{reply, {ok, LC2}, S#state{register=trimmed}};
handle_call({{reg_op, _ProjNum, trim}, LC1}, _From, #state{register=B} = S) when is_binary(B) ->
handle_call({{reg_op, _Epoch, trim}, LC1}, _From, #state{register=B} = S) when is_binary(B) ->
LC2 = lclock_update(LC1),
{reply, {ok, LC2}, S#state{register=trimmed}};
handle_call({{reg_op, _ProjNum, trim}, LC1}, _From, #state{register=trimmed} = S) ->
handle_call({{reg_op, _Epoch, trim}, LC1}, _From, #state{register=trimmed} = S) ->
LC2 = lclock_update(LC1),
{reply, {error_trimmed, LC2}, S};
handle_call({{proj_write, ProjNum, Proj}, LC1}, _From, S) ->
handle_call({{proj_write, Epoch, Proj}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
{Reply, NewS} = do_proj_write(ProjNum, Proj, S),
{Reply, NewS} = do_proj_write(Epoch, Proj, S),
{reply, {Reply, LC2}, NewS};
handle_call({{proj_read, ProjNum}, LC1}, _From, S) ->
handle_call({{proj_read, Epoch}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
{Reply, NewS} = do_proj_read(ProjNum, S),
{Reply, NewS} = do_proj_read(Epoch, S),
{reply, {Reply, LC2}, NewS};
handle_call({{proj_get_latest_num}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
@ -136,8 +148,8 @@ handle_call({{proj_read_latest}, LC1}, _From, S) ->
case do_proj_get_latest_num(S) of
{error_unwritten, _S} ->
{reply, {error_unwritten, LC2}, S};
{{ok, ProjNum}, _S} ->
Proj = orddict:fetch(ProjNum, S#state.proj_store),
{{ok, Epoch}, _S} ->
Proj = orddict:fetch(Epoch, S#state.proj_store),
{reply, {{ok, Proj}, LC2}, S}
end;
handle_call({stop, LC1}, _From, MLP) ->
@ -161,25 +173,25 @@ code_change(_OldVsn, MLP, _Extra) ->
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
do_proj_write(ProjNum, Proj, #state{proj_num=MyProjNum, proj_store=D,
do_proj_write(Epoch, Proj, #state{proj_epoch=MyEpoch, proj_store=D,
wedged=MyWedged} = S) ->
case orddict:find(ProjNum, D) of
case orddict:find(Epoch, D) of
error ->
D2 = orddict:store(ProjNum, Proj, D),
{NewProjNum, NewWedged} = if ProjNum > MyProjNum ->
{ProjNum, false};
D2 = orddict:store(Epoch, Proj, D),
{NewEpoch, NewWedged} = if Epoch > MyEpoch ->
{Epoch, false};
true ->
{MyProjNum, MyWedged}
{MyEpoch, MyWedged}
end,
{ok, S#state{wedged=NewWedged,
proj_num=NewProjNum,
proj_epoch=NewEpoch,
proj_store=D2}};
{ok, _} ->
{error_written, S}
end.
do_proj_read(ProjNum, #state{proj_store=D} = S) ->
case orddict:find(ProjNum, D) of
do_proj_read(Epoch, #state{proj_store=D} = S) ->
case orddict:find(Epoch, D) of
error ->
{error_unwritten, S};
{ok, Proj} ->
@ -191,8 +203,8 @@ do_proj_get_latest_num(#state{proj_store=D} = S) ->
[] ->
{error_unwritten, S};
L ->
{ProjNum, _Proj} = lists:last(L),
{{ok, ProjNum}, S}
{Epoch, _Proj} = lists:last(L),
{{ok, Epoch}, S}
end.
-ifdef(TEST).

View file

@ -1,6 +1,8 @@
-module(machi_flu0_test).
-include("machi.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
@ -32,21 +34,21 @@ concuerror3_test() ->
concuerror4_test() ->
event_setup(),
{ok, F1} = machi_flu0:start_link("one"),
ProjNum = 1,
ok = m_proj_write(F1, ProjNum, dontcare),
Epoch = 1,
ok = m_proj_write(F1, Epoch, dontcare),
Val = <<"val!">>,
ok = m_write(F1, ProjNum, Val),
{error_stale_projection, ProjNum} = m_write(F1, ProjNum - 1, Val),
ok = m_write(F1, Epoch, Val),
{error_stale_projection, Epoch} = m_write(F1, Epoch - 1, Val),
Me = self(),
TrimFun = fun() -> Res = m_trim(F1, ProjNum),
TrimFun = fun() -> Res = m_trim(F1, Epoch),
Me ! {self(), Res}
end,
TrimPids = [spawn(TrimFun), spawn(TrimFun), spawn(TrimFun)],
TrimExpected = [error_trimmed,error_trimmed,ok],
GetFun = fun() -> Res = m_read(F1, ProjNum),
GetFun = fun() -> Res = m_read(F1, Epoch),
Me ! {self(), Res}
end,
GetPids = [spawn(GetFun)],
@ -87,44 +89,77 @@ proj_store_test() ->
wedge_test() ->
event_setup(),
{ok, F1} = machi_flu0:start_link("one"),
ProjNum1 = 1,
ok = m_proj_write(F1, ProjNum1, dontcare),
Epoch1 = 1,
ok = m_proj_write(F1, Epoch1, dontcare),
Val = <<"val!">>,
ok = m_write(F1, ProjNum1, Val),
{error_stale_projection, ProjNum1} = m_write(F1, ProjNum1 - 1, Val),
error_wedged = m_write(F1, ProjNum1 + 1, Val),
ok = m_write(F1, Epoch1, Val),
{error_stale_projection, Epoch1} = m_write(F1, Epoch1 - 1, Val),
error_wedged = m_write(F1, Epoch1 + 1, Val),
%% Until we write a newer/bigger projection, all ops are error_wedged
error_wedged = m_read(F1, ProjNum1),
error_wedged = m_write(F1, ProjNum1, Val),
error_wedged = m_trim(F1, ProjNum1),
error_wedged = m_read(F1, Epoch1),
error_wedged = m_write(F1, Epoch1, Val),
error_wedged = m_trim(F1, Epoch1),
ProjNum2 = ProjNum1 + 1,
ok = m_proj_write(F1, ProjNum2, dontcare),
{ok, Val} = m_read(F1, ProjNum2),
error_written = m_write(F1, ProjNum2, Val),
ok = m_trim(F1, ProjNum2),
error_trimmed = m_trim(F1, ProjNum2),
Epoch2 = Epoch1 + 1,
ok = m_proj_write(F1, Epoch2, dontcare),
{ok, Val} = m_read(F1, Epoch2),
error_written = m_write(F1, Epoch2, Val),
ok = m_trim(F1, Epoch2),
error_trimmed = m_trim(F1, Epoch2),
ok = m_stop(F1),
_XX = event_get_all(), io:format(user, "XX ~p\n", [_XX]),
event_shutdown(),
ok.
proj0_test() ->
Me = self(),
event_setup(),
{ok, F1} = machi_flu0:start_link("one"),
{ok, F2} = machi_flu0:start_link("two"),
FLUs = [F1, F2],
FirstProj = machi_flu0:make_proj(1, FLUs),
Epoch1 = FirstProj#proj.epoch,
[ok = m_proj_write(F, Epoch1, FirstProj) || F <- FLUs],
Proj0 = machi_flu0:make_proj(-42, FLUs),
Val = <<"val!">>,
Pid1 = spawn(fun() ->
{ok, _Proj1} = m_append_page(Proj0, Val),
Me ! {self(), done}
end),
%% Pids = [Pid1],
SecondProj = machi_flu0:make_proj(2, FLUs),
Epoch2 = SecondProj#proj.epoch,
Pid2 = spawn(fun() ->
[ok = m_proj_write(F, Epoch2, SecondProj) || F <- FLUs],
Me ! {self(), done}
end),
Pids = [Pid1, Pid2],
[receive {Pid, _} -> ok end || Pid <- Pids],
[ok = m_stop(F) || F <- FLUs],
_XX = event_get_all(), %% io:format(user, "XX ~p\n", [_XX]),
event_shutdown(),
ok.
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
m_write(Pid, ProjNum1, Val) ->
Res = machi_flu0:write(Pid, ProjNum1, Val),
m_write(Pid, Epoch1, Val) ->
Res = machi_flu0:write(Pid, Epoch1, Val),
event_add(write, Pid, Res),
Res.
m_read(Pid, ProjNum) ->
Res = machi_flu0:read(Pid, ProjNum),
m_read(Pid, Epoch) ->
Res = machi_flu0:read(Pid, Epoch),
event_add(get, Pid, Res),
Res.
m_trim(Pid, ProjNum) ->
Res = machi_flu0:trim(Pid, ProjNum),
m_trim(Pid, Epoch) ->
Res = machi_flu0:trim(Pid, Epoch),
event_add(trim, Pid, Res),
Res.
@ -133,13 +168,13 @@ m_stop(Pid) ->
event_add(stop, Pid, Res),
Res.
m_proj_write(Pid, ProjNum, Proj) ->
Res = machi_flu0:proj_write(Pid, ProjNum, Proj),
m_proj_write(Pid, Epoch, Proj) ->
Res = machi_flu0:proj_write(Pid, Epoch, Proj),
event_add(proj_write, Pid, Res),
Res.
m_proj_read(Pid, ProjNum) ->
Res = machi_flu0:proj_read(Pid, ProjNum),
m_proj_read(Pid, Epoch) ->
Res = machi_flu0:proj_read(Pid, Epoch),
event_add(proj_read, Pid, Res),
Res.
@ -153,7 +188,114 @@ m_proj_read_latest(Pid) ->
event_add(proj_read_latest, Pid, Res),
Res.
m_append_page(Proj, Bytes) ->
m_append_page(Proj, Bytes, 5).
m_append_page(Proj, _Bytes, 0) ->
{{error_failed, ?MODULE, ?LINE}, Proj};
m_append_page(Proj, Bytes, Retries) ->
Retry = fun() ->
case poll_for_new_epoch_projection(Proj) of
{ok, NewProj} ->
m_append_page(NewProj, Bytes, Retries - 1);
Else ->
{Else, Proj}
end
end,
case m_append_page2(Proj, Bytes) of
%% lost_race ->
%% m_append_pageQQ(Proj, Bytes, Retries - 1);
{error_stale_projection, _} ->
Retry();
error_wedged ->
Retry();
Else ->
{Else, Proj}
end.
m_append_page2(#proj{epoch=Epoch, active=Active}, Bytes) ->
m_append_page3(Active, Epoch, Bytes).
m_append_page3([], _Epoch, _Bytes) ->
ok;
m_append_page3([H|T], Epoch, Bytes) ->
Res = (catch m_write(H, Epoch, Bytes)),
case Res of
ok ->
m_append_page3(T, Epoch, Bytes);
error_unwritten ->
exit({gack, line, ?LINE});
error_written ->
case m_read(H, Epoch) of
{ok, Present} when Present == Bytes ->
m_append_page3(T, Epoch, Bytes);
{error_stale_projection, _}=ESP ->
ESP;
Else ->
Else
end;
Else ->
Else
end.
%% L ->
%% case [x || {error_stale_projection, _} <- L] of
%% [] ->
%% UnwrittenP = lists:member(error_unwritten, L),
%% WrittenP = lists:member(error_written, L),
%% TrimmedP = lists:member(error_trimmed, L),
%% WedgedP = lists:member(error_wedged, L),
%% if UnwrittenP ->
%% error_unwritten;
%% WrittenP ->
%% error_written;
%% TrimmedP ->
%% error_trimmed;
%% WedgedP ->
%% error_wedged;
%% true ->
%% exit({gack, L})
%% end;
%% _ ->
%% {error_stale_projection, caller_not_looking_here}
%% end
%% end.
get_poll_retries() ->
25.
get_poll_sleep_time() ->
50.
poll_for_new_epoch_projection(P) ->
poll_for_new_epoch_projection(P, get_poll_retries()).
poll_for_new_epoch_projection(_P, 0) ->
exit({ouch, ?MODULE, ?LINE});
poll_for_new_epoch_projection(#proj{all=All} = P, Tries) ->
case multi_call(All, ?MODULE, m_proj_read_latest, []) of
[] ->
timer:sleep(get_poll_sleep_time()),
poll_for_new_epoch_projection(P, Tries - 1);
L ->
Answer = lists:last(lists:sort(lists:flatten(L))),
{ok, Answer}
end.
multi_call([], _Mod, _Fun, _ArgSuffix) ->
[];
multi_call([H|T], Mod, Fun, ArgSuffix) ->
case erlang:apply(Mod,Fun, [H|ArgSuffix]) of
{ok, X} ->
[X|multi_call(T, Mod, Fun, ArgSuffix)];
_ ->
multi_call(T, Mod, Fun, ArgSuffix)
end.
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
event_setup() ->
lamport_clock:reset(),
Tab = ?MODULE,
ok = event_shutdown(),
ets:new(Tab, [named_table, ordered_set, public]).

View file

@ -1,7 +1,7 @@
-module(lamport_clock).
-export([init/0, get/0, update/1, incr/0]).
-export([init/0, reset/0, get/0, update/1, incr/0]).
-define(KEY, ?MODULE).
@ -10,12 +10,15 @@
init() ->
case get(?KEY) of
undefined ->
FakeTOD = 0,
put(?KEY, FakeTOD + 1);
reset();
N when is_integer(N) ->
ok
end.
reset() ->
FakeTOD = 0,
put(?KEY, FakeTOD + 1).
get() ->
init(),
get(?KEY).
@ -35,6 +38,9 @@ incr() ->
init() ->
ok.
reset() ->
ok.
get() ->
ok.