WIP: done (I hope) adding Lamport clocks
This commit is contained in:
parent
ee7bc2645b
commit
f7447e8953
2 changed files with 91 additions and 49 deletions
|
@ -39,7 +39,7 @@ start_link(Name) when is_list(Name) ->
|
||||||
gen_server:start_link(?MODULE, [Name], []).
|
gen_server:start_link(?MODULE, [Name], []).
|
||||||
|
|
||||||
stop(Pid) ->
|
stop(Pid) ->
|
||||||
gen_server:call(Pid, stop, infinity).
|
g_call(Pid, stop, infinity).
|
||||||
|
|
||||||
read(Pid, ProjNum) ->
|
read(Pid, ProjNum) ->
|
||||||
g_call(Pid, {reg_op, ProjNum, read}, ?LONG_TIME).
|
g_call(Pid, {reg_op, ProjNum, read}, ?LONG_TIME).
|
||||||
|
@ -51,16 +51,16 @@ trim(Pid, ProjNum) ->
|
||||||
g_call(Pid, {reg_op, ProjNum, trim}, ?LONG_TIME).
|
g_call(Pid, {reg_op, ProjNum, trim}, ?LONG_TIME).
|
||||||
|
|
||||||
proj_write(Pid, ProjNum, Proj) ->
|
proj_write(Pid, ProjNum, Proj) ->
|
||||||
gen_server:call(Pid, {proj_write, ProjNum, Proj}, ?LONG_TIME).
|
g_call(Pid, {proj_write, ProjNum, Proj}, ?LONG_TIME).
|
||||||
|
|
||||||
proj_read(Pid, ProjNum) ->
|
proj_read(Pid, ProjNum) ->
|
||||||
gen_server:call(Pid, {proj_read, ProjNum}, ?LONG_TIME).
|
g_call(Pid, {proj_read, ProjNum}, ?LONG_TIME).
|
||||||
|
|
||||||
proj_get_latest_num(Pid) ->
|
proj_get_latest_num(Pid) ->
|
||||||
gen_server:call(Pid, {proj_get_latest_num}, ?LONG_TIME).
|
g_call(Pid, {proj_get_latest_num}, ?LONG_TIME).
|
||||||
|
|
||||||
proj_read_latest(Pid) ->
|
proj_read_latest(Pid) ->
|
||||||
gen_server:call(Pid, {proj_read_latest}, ?LONG_TIME).
|
g_call(Pid, {proj_read_latest}, ?LONG_TIME).
|
||||||
|
|
||||||
g_call(Pid, Arg, Timeout) ->
|
g_call(Pid, Arg, Timeout) ->
|
||||||
LC1 = lclock_get(),
|
LC1 = lclock_get(),
|
||||||
|
@ -119,25 +119,30 @@ handle_call({{reg_op, _ProjNum, trim}, LC1}, _From, #state{register=trimmed} = S
|
||||||
LC2 = lclock_update(LC1),
|
LC2 = lclock_update(LC1),
|
||||||
{reply, {error_trimmed, LC2}, S};
|
{reply, {error_trimmed, LC2}, S};
|
||||||
|
|
||||||
handle_call({proj_write, ProjNum, Proj}, _From, S) ->
|
handle_call({{proj_write, ProjNum, Proj}, LC1}, _From, S) ->
|
||||||
|
LC2 = lclock_update(LC1),
|
||||||
{Reply, NewS} = do_proj_write(ProjNum, Proj, S),
|
{Reply, NewS} = do_proj_write(ProjNum, Proj, S),
|
||||||
{reply, Reply, NewS};
|
{reply, {Reply, LC2}, NewS};
|
||||||
handle_call({proj_read, ProjNum}, _From, S) ->
|
handle_call({{proj_read, ProjNum}, LC1}, _From, S) ->
|
||||||
|
LC2 = lclock_update(LC1),
|
||||||
{Reply, NewS} = do_proj_read(ProjNum, S),
|
{Reply, NewS} = do_proj_read(ProjNum, S),
|
||||||
{reply, Reply, NewS};
|
{reply, {Reply, LC2}, NewS};
|
||||||
handle_call({proj_get_latest_num}, _From, S) ->
|
handle_call({{proj_get_latest_num}, LC1}, _From, S) ->
|
||||||
|
LC2 = lclock_update(LC1),
|
||||||
{Reply, NewS} = do_proj_get_latest_num(S),
|
{Reply, NewS} = do_proj_get_latest_num(S),
|
||||||
{reply, Reply, NewS};
|
{reply, {Reply, LC2}, NewS};
|
||||||
handle_call({proj_read_latest}, _From, S) ->
|
handle_call({{proj_read_latest}, LC1}, _From, S) ->
|
||||||
|
LC2 = lclock_update(LC1),
|
||||||
case do_proj_get_latest_num(S) of
|
case do_proj_get_latest_num(S) of
|
||||||
{error_unwritten, _S} ->
|
{error_unwritten, _S} ->
|
||||||
{reply, error_unwritten, S};
|
{reply, {error_unwritten, LC2}, S};
|
||||||
{{ok, ProjNum}, _S} ->
|
{{ok, ProjNum}, _S} ->
|
||||||
Proj = orddict:fetch(ProjNum, S#state.proj_store),
|
Proj = orddict:fetch(ProjNum, S#state.proj_store),
|
||||||
{reply, {ok, Proj}, S}
|
{reply, {{ok, Proj}, LC2}, S}
|
||||||
end;
|
end;
|
||||||
handle_call(stop, _From, MLP) ->
|
handle_call({stop, LC1}, _From, MLP) ->
|
||||||
{stop, normal, ok, MLP};
|
LC2 = lclock_update(LC1),
|
||||||
|
{stop, normal, {ok, LC2}, MLP};
|
||||||
handle_call(_Request, _From, MLP) ->
|
handle_call(_Request, _From, MLP) ->
|
||||||
Reply = whaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa,
|
Reply = whaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa,
|
||||||
{reply, Reply, MLP}.
|
{reply, Reply, MLP}.
|
||||||
|
|
|
@ -30,22 +30,23 @@ concuerror3_test() ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
concuerror4_test() ->
|
concuerror4_test() ->
|
||||||
|
event_setup(),
|
||||||
{ok, F1} = machi_flu0:start_link("one"),
|
{ok, F1} = machi_flu0:start_link("one"),
|
||||||
ProjNum = 1,
|
ProjNum = 1,
|
||||||
ok = machi_flu0:proj_write(F1, ProjNum, dontcare),
|
ok = m_proj_write(F1, ProjNum, dontcare),
|
||||||
|
|
||||||
Val = <<"val!">>,
|
Val = <<"val!">>,
|
||||||
ok = machi_flu0:write(F1, ProjNum, Val),
|
ok = m_write(F1, ProjNum, Val),
|
||||||
{error_stale_projection, ProjNum} = machi_flu0:write(F1, ProjNum - 1, Val),
|
{error_stale_projection, ProjNum} = m_write(F1, ProjNum - 1, Val),
|
||||||
|
|
||||||
Me = self(),
|
Me = self(),
|
||||||
TrimFun = fun() -> Res = machi_flu0:trim(F1, ProjNum),
|
TrimFun = fun() -> Res = m_trim(F1, ProjNum),
|
||||||
Me ! {self(), Res}
|
Me ! {self(), Res}
|
||||||
end,
|
end,
|
||||||
TrimPids = [spawn(TrimFun), spawn(TrimFun), spawn(TrimFun)],
|
TrimPids = [spawn(TrimFun), spawn(TrimFun), spawn(TrimFun)],
|
||||||
TrimExpected = [error_trimmed,error_trimmed,ok],
|
TrimExpected = [error_trimmed,error_trimmed,ok],
|
||||||
|
|
||||||
GetFun = fun() -> Res = machi_flu0:read(F1, ProjNum),
|
GetFun = fun() -> Res = m_read(F1, ProjNum),
|
||||||
Me ! {self(), Res}
|
Me ! {self(), Res}
|
||||||
end,
|
end,
|
||||||
GetPids = [spawn(GetFun)],
|
GetPids = [spawn(GetFun)],
|
||||||
|
@ -63,72 +64,108 @@ concuerror4_test() ->
|
||||||
end || GetPid <- GetPids]),
|
end || GetPid <- GetPids]),
|
||||||
ok = GetExpected(GetResults),
|
ok = GetExpected(GetResults),
|
||||||
|
|
||||||
ok = machi_flu0:stop(F1),
|
ok = m_stop(F1),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
proj_store_test() ->
|
proj_store_test() ->
|
||||||
|
event_setup(),
|
||||||
{ok, F1} = machi_flu0:start_link("one"),
|
{ok, F1} = machi_flu0:start_link("one"),
|
||||||
|
|
||||||
error_unwritten = machi_flu0:proj_get_latest_num(F1),
|
error_unwritten = m_proj_get_latest_num(F1),
|
||||||
error_unwritten = machi_flu0:proj_read_latest(F1),
|
error_unwritten = m_proj_read_latest(F1),
|
||||||
|
|
||||||
Proj1 = whatever1,
|
Proj1 = whatever1,
|
||||||
ok = machi_flu0:proj_write(F1, 1, Proj1),
|
ok = m_proj_write(F1, 1, Proj1),
|
||||||
error_written = machi_flu0:proj_write(F1, 1, Proj1),
|
error_written = m_proj_write(F1, 1, Proj1),
|
||||||
{ok, Proj1} = machi_flu0:proj_read(F1, 1),
|
{ok, Proj1} = m_proj_read(F1, 1),
|
||||||
{ok, 1} = machi_flu0:proj_get_latest_num(F1),
|
{ok, 1} = m_proj_get_latest_num(F1),
|
||||||
{ok, Proj1} = machi_flu0:proj_read_latest(F1),
|
{ok, Proj1} = m_proj_read_latest(F1),
|
||||||
|
|
||||||
ok = machi_flu0:stop(F1),
|
ok = m_stop(F1),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
wedge_test() ->
|
wedge_test() ->
|
||||||
event_setup(),
|
event_setup(),
|
||||||
{ok, F1} = machi_flu0:start_link("one"),
|
{ok, F1} = machi_flu0:start_link("one"),
|
||||||
ProjNum1 = 1,
|
ProjNum1 = 1,
|
||||||
ok = machi_flu0:proj_write(F1, ProjNum1, dontcare),
|
ok = m_proj_write(F1, ProjNum1, dontcare),
|
||||||
|
|
||||||
Val = <<"val!">>,
|
Val = <<"val!">>,
|
||||||
ok = machi_flu0:write(F1, ProjNum1, Val),
|
ok = m_write(F1, ProjNum1, Val),
|
||||||
{error_stale_projection, ProjNum1} = machi_flu0:write(F1, ProjNum1 - 1, Val),
|
{error_stale_projection, ProjNum1} = m_write(F1, ProjNum1 - 1, Val),
|
||||||
error_wedged = machi_flu0:write(F1, ProjNum1 + 1, Val),
|
error_wedged = m_write(F1, ProjNum1 + 1, Val),
|
||||||
%% Until we write a newer/bigger projection, all ops are error_wedged
|
%% Until we write a newer/bigger projection, all ops are error_wedged
|
||||||
error_wedged = read(F1, ProjNum1),
|
error_wedged = m_read(F1, ProjNum1),
|
||||||
error_wedged = machi_flu0:write(F1, ProjNum1, Val),
|
error_wedged = m_write(F1, ProjNum1, Val),
|
||||||
error_wedged = machi_flu0:trim(F1, ProjNum1),
|
error_wedged = m_trim(F1, ProjNum1),
|
||||||
|
|
||||||
ProjNum2 = ProjNum1 + 1,
|
ProjNum2 = ProjNum1 + 1,
|
||||||
ok = machi_flu0:proj_write(F1, ProjNum2, dontcare),
|
ok = m_proj_write(F1, ProjNum2, dontcare),
|
||||||
{ok, Val} = read(F1, ProjNum2),
|
{ok, Val} = m_read(F1, ProjNum2),
|
||||||
error_written = machi_flu0:write(F1, ProjNum2, Val),
|
error_written = m_write(F1, ProjNum2, Val),
|
||||||
ok = machi_flu0:trim(F1, ProjNum2),
|
ok = m_trim(F1, ProjNum2),
|
||||||
error_trimmed = machi_flu0:trim(F1, ProjNum2),
|
error_trimmed = m_trim(F1, ProjNum2),
|
||||||
|
|
||||||
ok = machi_flu0:stop(F1),
|
ok = m_stop(F1),
|
||||||
XX = event_get_all(), io:format(user, "XX ~p\n", [XX]),
|
_XX = event_get_all(), io:format(user, "XX ~p\n", [_XX]),
|
||||||
event_shutdown(),
|
event_shutdown(),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
|
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
|
||||||
|
|
||||||
read(Pid, ProjNum) ->
|
m_write(Pid, ProjNum1, Val) ->
|
||||||
|
Res = machi_flu0:write(Pid, ProjNum1, Val),
|
||||||
|
event_add(write, Pid, Res),
|
||||||
|
Res.
|
||||||
|
|
||||||
|
m_read(Pid, ProjNum) ->
|
||||||
Res = machi_flu0:read(Pid, ProjNum),
|
Res = machi_flu0:read(Pid, ProjNum),
|
||||||
event_add(get, Res),
|
event_add(get, Pid, Res),
|
||||||
|
Res.
|
||||||
|
|
||||||
|
m_trim(Pid, ProjNum) ->
|
||||||
|
Res = machi_flu0:trim(Pid, ProjNum),
|
||||||
|
event_add(trim, Pid, Res),
|
||||||
|
Res.
|
||||||
|
|
||||||
|
m_stop(Pid) ->
|
||||||
|
Res = machi_flu0:stop(Pid),
|
||||||
|
event_add(stop, Pid, Res),
|
||||||
|
Res.
|
||||||
|
|
||||||
|
m_proj_write(Pid, ProjNum, Proj) ->
|
||||||
|
Res = machi_flu0:proj_write(Pid, ProjNum, Proj),
|
||||||
|
event_add(proj_write, Pid, Res),
|
||||||
|
Res.
|
||||||
|
|
||||||
|
m_proj_read(Pid, ProjNum) ->
|
||||||
|
Res = machi_flu0:proj_read(Pid, ProjNum),
|
||||||
|
event_add(proj_read, Pid, Res),
|
||||||
|
Res.
|
||||||
|
|
||||||
|
m_proj_get_latest_num(Pid) ->
|
||||||
|
Res = machi_flu0:proj_get_latest_num(Pid),
|
||||||
|
event_add(proj_get_latest_num, Pid, Res),
|
||||||
|
Res.
|
||||||
|
|
||||||
|
m_proj_read_latest(Pid) ->
|
||||||
|
Res = machi_flu0:proj_read_latest(Pid),
|
||||||
|
event_add(proj_read_latest, Pid, Res),
|
||||||
Res.
|
Res.
|
||||||
|
|
||||||
event_setup() ->
|
event_setup() ->
|
||||||
Tab = ?MODULE,
|
Tab = ?MODULE,
|
||||||
ok = event_shutdown(),
|
ok = event_shutdown(),
|
||||||
ets:new(Tab, [named_table, ordered_set]).
|
ets:new(Tab, [named_table, ordered_set, public]).
|
||||||
|
|
||||||
event_shutdown() ->
|
event_shutdown() ->
|
||||||
Tab = ?MODULE,
|
Tab = ?MODULE,
|
||||||
(catch ets:delete(Tab)),
|
(catch ets:delete(Tab)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
event_add(Key, Description) ->
|
event_add(Key, Who, Description) ->
|
||||||
Tab = ?MODULE,
|
Tab = ?MODULE,
|
||||||
ets:insert(Tab, {lamport_clock:get(), Key, Description}).
|
ets:insert(Tab, {lamport_clock:get(), Key, Who, Description}).
|
||||||
|
|
||||||
event_get_all() ->
|
event_get_all() ->
|
||||||
Tab = ?MODULE,
|
Tab = ?MODULE,
|
||||||
|
|
Loading…
Reference in a new issue