WIP: in the middle of adding Lamport clocks
This commit is contained in:
parent
b443a15542
commit
ee7bc2645b
3 changed files with 98 additions and 29 deletions
|
@ -42,13 +42,13 @@ stop(Pid) ->
|
||||||
gen_server:call(Pid, stop, infinity).
|
gen_server:call(Pid, stop, infinity).
|
||||||
|
|
||||||
read(Pid, ProjNum) ->
|
read(Pid, ProjNum) ->
|
||||||
gen_server:call(Pid, {reg_op, ProjNum, read}, ?LONG_TIME).
|
g_call(Pid, {reg_op, ProjNum, read}, ?LONG_TIME).
|
||||||
|
|
||||||
write(Pid, ProjNum, Bin) when is_binary(Bin) ->
|
write(Pid, ProjNum, Bin) when is_binary(Bin) ->
|
||||||
gen_server:call(Pid, {reg_op, ProjNum, {write, Bin}}, ?LONG_TIME).
|
g_call(Pid, {reg_op, ProjNum, {write, Bin}}, ?LONG_TIME).
|
||||||
|
|
||||||
trim(Pid, ProjNum) ->
|
trim(Pid, ProjNum) ->
|
||||||
gen_server:call(Pid, {reg_op, ProjNum, trim}, ?LONG_TIME).
|
g_call(Pid, {reg_op, ProjNum, trim}, ?LONG_TIME).
|
||||||
|
|
||||||
proj_write(Pid, ProjNum, Proj) ->
|
proj_write(Pid, ProjNum, Proj) ->
|
||||||
gen_server:call(Pid, {proj_write, ProjNum, Proj}, ?LONG_TIME).
|
gen_server:call(Pid, {proj_write, ProjNum, Proj}, ?LONG_TIME).
|
||||||
|
@ -62,44 +62,62 @@ proj_get_latest_num(Pid) ->
|
||||||
proj_read_latest(Pid) ->
|
proj_read_latest(Pid) ->
|
||||||
gen_server:call(Pid, {proj_read_latest}, ?LONG_TIME).
|
gen_server:call(Pid, {proj_read_latest}, ?LONG_TIME).
|
||||||
|
|
||||||
|
g_call(Pid, Arg, Timeout) ->
|
||||||
|
LC1 = lclock_get(),
|
||||||
|
{Res, LC2} = gen_server:call(Pid, {Arg, LC1}, Timeout),
|
||||||
|
lclock_update(LC2),
|
||||||
|
Res.
|
||||||
|
|
||||||
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
|
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
|
||||||
|
|
||||||
init([Name]) ->
|
init([Name]) ->
|
||||||
|
lclock_init(),
|
||||||
{ok, #state{name=Name,
|
{ok, #state{name=Name,
|
||||||
proj_num=-42,
|
proj_num=-42,
|
||||||
proj_store=orddict:new()}}.
|
proj_store=orddict:new()}}.
|
||||||
|
|
||||||
handle_call({reg_op, _ProjNum, _}, _From, #state{wedged=true} = S) ->
|
handle_call({{reg_op, _ProjNum, _}, LC1}, _From, #state{wedged=true} = S) ->
|
||||||
{reply, error_wedged, S};
|
LC2 = lclock_update(LC1),
|
||||||
handle_call({reg_op, ProjNum, _}, _From, #state{proj_num=MyProjNum} = S)
|
{reply, {error_wedged, LC2}, S};
|
||||||
|
handle_call({{reg_op, ProjNum, _}, LC1}, _From, #state{proj_num=MyProjNum} = S)
|
||||||
when ProjNum < MyProjNum ->
|
when ProjNum < MyProjNum ->
|
||||||
{reply, {error_stale_projection, MyProjNum}, S};
|
LC2 = lclock_update(LC1),
|
||||||
handle_call({reg_op, ProjNum, _}, _From, #state{proj_num=MyProjNum} = S)
|
{reply, {{error_stale_projection, MyProjNum}, LC2}, S};
|
||||||
|
handle_call({{reg_op, ProjNum, _}, LC1}, _From, #state{proj_num=MyProjNum} = S)
|
||||||
when ProjNum > MyProjNum ->
|
when ProjNum > MyProjNum ->
|
||||||
{reply, error_wedged, S#state{wedged=true}};
|
LC2 = lclock_update(LC1),
|
||||||
|
{reply, {error_wedged, LC2}, S#state{wedged=true}};
|
||||||
|
|
||||||
handle_call({reg_op, _ProjNum, {write, Bin}}, _From,
|
handle_call({{reg_op, _ProjNum, {write, Bin}}, LC1}, _From,
|
||||||
#state{register=unwritten} = S) ->
|
#state{register=unwritten} = S) ->
|
||||||
{reply, ok, S#state{register=Bin}};
|
LC2 = lclock_update(LC1),
|
||||||
handle_call({reg_op, _ProjNum, {write, _Bin}}, _From,
|
{reply, {ok, LC2}, S#state{register=Bin}};
|
||||||
|
handle_call({{reg_op, _ProjNum, {write, _Bin}}, LC1}, _From,
|
||||||
#state{register=B} = S) when is_binary(B) ->
|
#state{register=B} = S) when is_binary(B) ->
|
||||||
{reply, error_written, S};
|
LC2 = lclock_update(LC1),
|
||||||
handle_call({reg_op, _ProjNum, {write, _Bin}}, _From,
|
{reply, {error_written, LC2}, S};
|
||||||
|
handle_call({{reg_op, _ProjNum, {write, _Bin}}, LC1}, _From,
|
||||||
#state{register=trimmed} = S) ->
|
#state{register=trimmed} = S) ->
|
||||||
{reply, error_trimmed, S};
|
LC2 = lclock_update(LC1),
|
||||||
|
{reply, {error_trimmed, LC2}, S};
|
||||||
|
|
||||||
handle_call({reg_op, ProjNum, read}, _From, #state{proj_num=MyProjNum} = S)
|
handle_call({{reg_op, ProjNum, read}, LC1}, _From, #state{proj_num=MyProjNum} = S)
|
||||||
when ProjNum /= MyProjNum ->
|
when ProjNum /= MyProjNum ->
|
||||||
{reply, {error_stale_projection, MyProjNum}, S};
|
LC2 = lclock_update(LC1),
|
||||||
handle_call({reg_op, _ProjNum, read}, _From, #state{register=Reg} = S) ->
|
{reply, {{error_stale_projection, MyProjNum}, LC2}, S};
|
||||||
{reply, {ok, Reg}, S};
|
handle_call({{reg_op, _ProjNum, read}, LC1}, _From, #state{register=Reg} = S) ->
|
||||||
|
LC2 = lclock_update(LC1),
|
||||||
|
{reply, {{ok, Reg}, LC2}, S};
|
||||||
|
|
||||||
handle_call({reg_op, _ProjNum, trim}, _From, #state{register=unwritten} = S) ->
|
handle_call({{reg_op, _ProjNum, trim}, LC1}, _From, #state{register=unwritten} = S) ->
|
||||||
{reply, ok, S#state{register=trimmed}};
|
LC2 = lclock_update(LC1),
|
||||||
handle_call({reg_op, _ProjNum, trim}, _From, #state{register=B} = S) when is_binary(B) ->
|
{reply, {ok, LC2}, S#state{register=trimmed}};
|
||||||
{reply, ok, S#state{register=trimmed}};
|
handle_call({{reg_op, _ProjNum, trim}, LC1}, _From, #state{register=B} = S) when is_binary(B) ->
|
||||||
handle_call({reg_op, _ProjNum, trim}, _From, #state{register=trimmed} = S) ->
|
LC2 = lclock_update(LC1),
|
||||||
{reply, error_trimmed, S};
|
{reply, {ok, LC2}, S#state{register=trimmed}};
|
||||||
|
handle_call({{reg_op, _ProjNum, trim}, LC1}, _From, #state{register=trimmed} = S) ->
|
||||||
|
LC2 = lclock_update(LC1),
|
||||||
|
{reply, {error_trimmed, LC2}, S};
|
||||||
|
|
||||||
handle_call({proj_write, ProjNum, Proj}, _From, S) ->
|
handle_call({proj_write, ProjNum, Proj}, _From, S) ->
|
||||||
{Reply, NewS} = do_proj_write(ProjNum, Proj, S),
|
{Reply, NewS} = do_proj_write(ProjNum, Proj, S),
|
||||||
|
@ -171,3 +189,27 @@ do_proj_get_latest_num(#state{proj_store=D} = S) ->
|
||||||
{ProjNum, _Proj} = lists:last(L),
|
{ProjNum, _Proj} = lists:last(L),
|
||||||
{{ok, ProjNum}, S}
|
{{ok, ProjNum}, S}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
|
||||||
|
lclock_init() ->
|
||||||
|
lamport_clock:init().
|
||||||
|
|
||||||
|
lclock_get() ->
|
||||||
|
lamport_clock:get().
|
||||||
|
|
||||||
|
lclock_update(LC) ->
|
||||||
|
lamport_clock:update(LC).
|
||||||
|
|
||||||
|
-else. % PULSE
|
||||||
|
|
||||||
|
lclock_init() ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
lclock_get() ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
lclock_update(_LC) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
-endif. % TEST
|
||||||
|
|
|
@ -83,6 +83,7 @@ proj_store_test() ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
wedge_test() ->
|
wedge_test() ->
|
||||||
|
event_setup(),
|
||||||
{ok, F1} = machi_flu0:start_link("one"),
|
{ok, F1} = machi_flu0:start_link("one"),
|
||||||
ProjNum1 = 1,
|
ProjNum1 = 1,
|
||||||
ok = machi_flu0:proj_write(F1, ProjNum1, dontcare),
|
ok = machi_flu0:proj_write(F1, ProjNum1, dontcare),
|
||||||
|
@ -92,19 +93,46 @@ wedge_test() ->
|
||||||
{error_stale_projection, ProjNum1} = machi_flu0:write(F1, ProjNum1 - 1, Val),
|
{error_stale_projection, ProjNum1} = machi_flu0:write(F1, ProjNum1 - 1, Val),
|
||||||
error_wedged = machi_flu0:write(F1, ProjNum1 + 1, Val),
|
error_wedged = machi_flu0:write(F1, ProjNum1 + 1, Val),
|
||||||
%% Until we write a newer/bigger projection, all ops are error_wedged
|
%% Until we write a newer/bigger projection, all ops are error_wedged
|
||||||
error_wedged = machi_flu0:read(F1, ProjNum1),
|
error_wedged = read(F1, ProjNum1),
|
||||||
error_wedged = machi_flu0:write(F1, ProjNum1, Val),
|
error_wedged = machi_flu0:write(F1, ProjNum1, Val),
|
||||||
error_wedged = machi_flu0:trim(F1, ProjNum1),
|
error_wedged = machi_flu0:trim(F1, ProjNum1),
|
||||||
|
|
||||||
ProjNum2 = ProjNum1 + 1,
|
ProjNum2 = ProjNum1 + 1,
|
||||||
ok = machi_flu0:proj_write(F1, ProjNum2, dontcare),
|
ok = machi_flu0:proj_write(F1, ProjNum2, dontcare),
|
||||||
{ok, Val} = machi_flu0:read(F1, ProjNum2),
|
{ok, Val} = read(F1, ProjNum2),
|
||||||
error_written = machi_flu0:write(F1, ProjNum2, Val),
|
error_written = machi_flu0:write(F1, ProjNum2, Val),
|
||||||
ok = machi_flu0:trim(F1, ProjNum2),
|
ok = machi_flu0:trim(F1, ProjNum2),
|
||||||
error_trimmed = machi_flu0:trim(F1, ProjNum2),
|
error_trimmed = machi_flu0:trim(F1, ProjNum2),
|
||||||
|
|
||||||
ok = machi_flu0:stop(F1),
|
ok = machi_flu0:stop(F1),
|
||||||
|
XX = event_get_all(), io:format(user, "XX ~p\n", [XX]),
|
||||||
|
event_shutdown(),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
|
||||||
|
|
||||||
|
read(Pid, ProjNum) ->
|
||||||
|
Res = machi_flu0:read(Pid, ProjNum),
|
||||||
|
event_add(get, Res),
|
||||||
|
Res.
|
||||||
|
|
||||||
|
event_setup() ->
|
||||||
|
Tab = ?MODULE,
|
||||||
|
ok = event_shutdown(),
|
||||||
|
ets:new(Tab, [named_table, ordered_set]).
|
||||||
|
|
||||||
|
event_shutdown() ->
|
||||||
|
Tab = ?MODULE,
|
||||||
|
(catch ets:delete(Tab)),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
event_add(Key, Description) ->
|
||||||
|
Tab = ?MODULE,
|
||||||
|
ets:insert(Tab, {lamport_clock:get(), Key, Description}).
|
||||||
|
|
||||||
|
event_get_all() ->
|
||||||
|
Tab = ?MODULE,
|
||||||
|
ets:tab2list(Tab).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -10,8 +10,6 @@
|
||||||
init() ->
|
init() ->
|
||||||
case get(?KEY) of
|
case get(?KEY) of
|
||||||
undefined ->
|
undefined ->
|
||||||
%% {Ca, Cb, _} = now(),
|
|
||||||
%% FakeTOD = ((Ca * 1000000) + Cb) * 1000000,
|
|
||||||
FakeTOD = 0,
|
FakeTOD = 0,
|
||||||
put(?KEY, FakeTOD + 1);
|
put(?KEY, FakeTOD + 1);
|
||||||
N when is_integer(N) ->
|
N when is_integer(N) ->
|
||||||
|
@ -19,6 +17,7 @@ init() ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get() ->
|
get() ->
|
||||||
|
init(),
|
||||||
get(?KEY).
|
get(?KEY).
|
||||||
|
|
||||||
update(Remote) ->
|
update(Remote) ->
|
||||||
|
|
Loading…
Reference in a new issue