diff --git a/prototype/poc-machi/src/machi_flu0.erl b/prototype/poc-machi/src/machi_flu0.erl index ff4e6a4..2b5949c 100644 --- a/prototype/poc-machi/src/machi_flu0.erl +++ b/prototype/poc-machi/src/machi_flu0.erl @@ -42,13 +42,13 @@ stop(Pid) -> gen_server:call(Pid, stop, infinity). read(Pid, ProjNum) -> - gen_server:call(Pid, {reg_op, ProjNum, read}, ?LONG_TIME). + g_call(Pid, {reg_op, ProjNum, read}, ?LONG_TIME). write(Pid, ProjNum, Bin) when is_binary(Bin) -> - gen_server:call(Pid, {reg_op, ProjNum, {write, Bin}}, ?LONG_TIME). + g_call(Pid, {reg_op, ProjNum, {write, Bin}}, ?LONG_TIME). trim(Pid, ProjNum) -> - gen_server:call(Pid, {reg_op, ProjNum, trim}, ?LONG_TIME). + g_call(Pid, {reg_op, ProjNum, trim}, ?LONG_TIME). proj_write(Pid, ProjNum, Proj) -> gen_server:call(Pid, {proj_write, ProjNum, Proj}, ?LONG_TIME). @@ -62,44 +62,62 @@ proj_get_latest_num(Pid) -> proj_read_latest(Pid) -> gen_server:call(Pid, {proj_read_latest}, ?LONG_TIME). +g_call(Pid, Arg, Timeout) -> + LC1 = lclock_get(), + {Res, LC2} = gen_server:call(Pid, {Arg, LC1}, Timeout), + lclock_update(LC2), + Res. + %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% init([Name]) -> + lclock_init(), {ok, #state{name=Name, proj_num=-42, proj_store=orddict:new()}}. -handle_call({reg_op, _ProjNum, _}, _From, #state{wedged=true} = S) -> - {reply, error_wedged, S}; -handle_call({reg_op, ProjNum, _}, _From, #state{proj_num=MyProjNum} = S) +handle_call({{reg_op, _ProjNum, _}, LC1}, _From, #state{wedged=true} = S) -> + LC2 = lclock_update(LC1), + {reply, {error_wedged, LC2}, S}; +handle_call({{reg_op, ProjNum, _}, LC1}, _From, #state{proj_num=MyProjNum} = S) when ProjNum < MyProjNum -> - {reply, {error_stale_projection, MyProjNum}, S}; -handle_call({reg_op, ProjNum, _}, _From, #state{proj_num=MyProjNum} = S) + LC2 = lclock_update(LC1), + {reply, {{error_stale_projection, MyProjNum}, LC2}, S}; +handle_call({{reg_op, ProjNum, _}, LC1}, _From, #state{proj_num=MyProjNum} = S) when ProjNum > MyProjNum -> - {reply, error_wedged, S#state{wedged=true}}; + LC2 = lclock_update(LC1), + {reply, {error_wedged, LC2}, S#state{wedged=true}}; -handle_call({reg_op, _ProjNum, {write, Bin}}, _From, +handle_call({{reg_op, _ProjNum, {write, Bin}}, LC1}, _From, #state{register=unwritten} = S) -> - {reply, ok, S#state{register=Bin}}; -handle_call({reg_op, _ProjNum, {write, _Bin}}, _From, + LC2 = lclock_update(LC1), + {reply, {ok, LC2}, S#state{register=Bin}}; +handle_call({{reg_op, _ProjNum, {write, _Bin}}, LC1}, _From, #state{register=B} = S) when is_binary(B) -> - {reply, error_written, S}; -handle_call({reg_op, _ProjNum, {write, _Bin}}, _From, + LC2 = lclock_update(LC1), + {reply, {error_written, LC2}, S}; +handle_call({{reg_op, _ProjNum, {write, _Bin}}, LC1}, _From, #state{register=trimmed} = S) -> - {reply, error_trimmed, S}; + LC2 = lclock_update(LC1), + {reply, {error_trimmed, LC2}, S}; -handle_call({reg_op, ProjNum, read}, _From, #state{proj_num=MyProjNum} = S) +handle_call({{reg_op, ProjNum, read}, LC1}, _From, #state{proj_num=MyProjNum} = S) when ProjNum /= MyProjNum -> - {reply, {error_stale_projection, MyProjNum}, S}; -handle_call({reg_op, _ProjNum, read}, _From, #state{register=Reg} = S) -> - {reply, {ok, Reg}, S}; + LC2 = lclock_update(LC1), + {reply, {{error_stale_projection, MyProjNum}, LC2}, S}; +handle_call({{reg_op, _ProjNum, read}, LC1}, _From, #state{register=Reg} = S) -> + LC2 = lclock_update(LC1), + {reply, {{ok, Reg}, LC2}, S}; -handle_call({reg_op, _ProjNum, trim}, _From, #state{register=unwritten} = S) -> - {reply, ok, S#state{register=trimmed}}; -handle_call({reg_op, _ProjNum, trim}, _From, #state{register=B} = S) when is_binary(B) -> - {reply, ok, S#state{register=trimmed}}; -handle_call({reg_op, _ProjNum, trim}, _From, #state{register=trimmed} = S) -> - {reply, error_trimmed, S}; +handle_call({{reg_op, _ProjNum, trim}, LC1}, _From, #state{register=unwritten} = S) -> + LC2 = lclock_update(LC1), + {reply, {ok, LC2}, S#state{register=trimmed}}; +handle_call({{reg_op, _ProjNum, trim}, LC1}, _From, #state{register=B} = S) when is_binary(B) -> + LC2 = lclock_update(LC1), + {reply, {ok, LC2}, S#state{register=trimmed}}; +handle_call({{reg_op, _ProjNum, trim}, LC1}, _From, #state{register=trimmed} = S) -> + LC2 = lclock_update(LC1), + {reply, {error_trimmed, LC2}, S}; handle_call({proj_write, ProjNum, Proj}, _From, S) -> {Reply, NewS} = do_proj_write(ProjNum, Proj, S), @@ -171,3 +189,27 @@ do_proj_get_latest_num(#state{proj_store=D} = S) -> {ProjNum, _Proj} = lists:last(L), {{ok, ProjNum}, S} end. + +-ifdef(TEST). + +lclock_init() -> + lamport_clock:init(). + +lclock_get() -> + lamport_clock:get(). + +lclock_update(LC) -> + lamport_clock:update(LC). + +-else. % PULSE + +lclock_init() -> + ok. + +lclock_get() -> + ok. + +lclock_update(_LC) -> + ok. + +-endif. % TEST diff --git a/prototype/poc-machi/test/machi_flu0_test.erl b/prototype/poc-machi/test/machi_flu0_test.erl index fccf989..3456fa8 100644 --- a/prototype/poc-machi/test/machi_flu0_test.erl +++ b/prototype/poc-machi/test/machi_flu0_test.erl @@ -83,6 +83,7 @@ proj_store_test() -> ok. wedge_test() -> + event_setup(), {ok, F1} = machi_flu0:start_link("one"), ProjNum1 = 1, ok = machi_flu0:proj_write(F1, ProjNum1, dontcare), @@ -92,19 +93,46 @@ wedge_test() -> {error_stale_projection, ProjNum1} = machi_flu0:write(F1, ProjNum1 - 1, Val), error_wedged = machi_flu0:write(F1, ProjNum1 + 1, Val), %% Until we write a newer/bigger projection, all ops are error_wedged - error_wedged = machi_flu0:read(F1, ProjNum1), + error_wedged = read(F1, ProjNum1), error_wedged = machi_flu0:write(F1, ProjNum1, Val), error_wedged = machi_flu0:trim(F1, ProjNum1), ProjNum2 = ProjNum1 + 1, ok = machi_flu0:proj_write(F1, ProjNum2, dontcare), - {ok, Val} = machi_flu0:read(F1, ProjNum2), + {ok, Val} = read(F1, ProjNum2), error_written = machi_flu0:write(F1, ProjNum2, Val), ok = machi_flu0:trim(F1, ProjNum2), error_trimmed = machi_flu0:trim(F1, ProjNum2), ok = machi_flu0:stop(F1), + XX = event_get_all(), io:format(user, "XX ~p\n", [XX]), + event_shutdown(), ok. +%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% + +read(Pid, ProjNum) -> + Res = machi_flu0:read(Pid, ProjNum), + event_add(get, Res), + Res. + +event_setup() -> + Tab = ?MODULE, + ok = event_shutdown(), + ets:new(Tab, [named_table, ordered_set]). + +event_shutdown() -> + Tab = ?MODULE, + (catch ets:delete(Tab)), + ok. + +event_add(Key, Description) -> + Tab = ?MODULE, + ets:insert(Tab, {lamport_clock:get(), Key, Description}). + +event_get_all() -> + Tab = ?MODULE, + ets:tab2list(Tab). + -endif. -endif. diff --git a/prototype/poc-machi/test/pulse_util/lamport_clock.erl b/prototype/poc-machi/test/pulse_util/lamport_clock.erl index fab7244..5ec3412 100644 --- a/prototype/poc-machi/test/pulse_util/lamport_clock.erl +++ b/prototype/poc-machi/test/pulse_util/lamport_clock.erl @@ -10,8 +10,6 @@ init() -> case get(?KEY) of undefined -> - %% {Ca, Cb, _} = now(), - %% FakeTOD = ((Ca * 1000000) + Cb) * 1000000, FakeTOD = 0, put(?KEY, FakeTOD + 1); N when is_integer(N) -> @@ -19,6 +17,7 @@ init() -> end. get() -> + init(), get(?KEY). update(Remote) ->