From f7447e8953093eda49ed8138afc1fe43446427f3 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Sun, 28 Sep 2014 17:38:21 +0900 Subject: [PATCH] WIP: done (I hope) adding Lamport clocks --- prototype/poc-machi/src/machi_flu0.erl | 37 ++++--- prototype/poc-machi/test/machi_flu0_test.erl | 103 +++++++++++++------ 2 files changed, 91 insertions(+), 49 deletions(-) diff --git a/prototype/poc-machi/src/machi_flu0.erl b/prototype/poc-machi/src/machi_flu0.erl index 2b5949c..1fb3a2c 100644 --- a/prototype/poc-machi/src/machi_flu0.erl +++ b/prototype/poc-machi/src/machi_flu0.erl @@ -39,7 +39,7 @@ start_link(Name) when is_list(Name) -> gen_server:start_link(?MODULE, [Name], []). stop(Pid) -> - gen_server:call(Pid, stop, infinity). + g_call(Pid, stop, infinity). read(Pid, ProjNum) -> g_call(Pid, {reg_op, ProjNum, read}, ?LONG_TIME). @@ -51,16 +51,16 @@ trim(Pid, ProjNum) -> g_call(Pid, {reg_op, ProjNum, trim}, ?LONG_TIME). proj_write(Pid, ProjNum, Proj) -> - gen_server:call(Pid, {proj_write, ProjNum, Proj}, ?LONG_TIME). + g_call(Pid, {proj_write, ProjNum, Proj}, ?LONG_TIME). proj_read(Pid, ProjNum) -> - gen_server:call(Pid, {proj_read, ProjNum}, ?LONG_TIME). + g_call(Pid, {proj_read, ProjNum}, ?LONG_TIME). proj_get_latest_num(Pid) -> - gen_server:call(Pid, {proj_get_latest_num}, ?LONG_TIME). + g_call(Pid, {proj_get_latest_num}, ?LONG_TIME). proj_read_latest(Pid) -> - gen_server:call(Pid, {proj_read_latest}, ?LONG_TIME). + g_call(Pid, {proj_read_latest}, ?LONG_TIME). g_call(Pid, Arg, Timeout) -> LC1 = lclock_get(), @@ -119,25 +119,30 @@ handle_call({{reg_op, _ProjNum, trim}, LC1}, _From, #state{register=trimmed} = S LC2 = lclock_update(LC1), {reply, {error_trimmed, LC2}, S}; -handle_call({proj_write, ProjNum, Proj}, _From, S) -> +handle_call({{proj_write, ProjNum, Proj}, LC1}, _From, S) -> + LC2 = lclock_update(LC1), {Reply, NewS} = do_proj_write(ProjNum, Proj, S), - {reply, Reply, NewS}; -handle_call({proj_read, ProjNum}, _From, S) -> + {reply, {Reply, LC2}, NewS}; +handle_call({{proj_read, ProjNum}, LC1}, _From, S) -> + LC2 = lclock_update(LC1), {Reply, NewS} = do_proj_read(ProjNum, S), - {reply, Reply, NewS}; -handle_call({proj_get_latest_num}, _From, S) -> + {reply, {Reply, LC2}, NewS}; +handle_call({{proj_get_latest_num}, LC1}, _From, S) -> + LC2 = lclock_update(LC1), {Reply, NewS} = do_proj_get_latest_num(S), - {reply, Reply, NewS}; -handle_call({proj_read_latest}, _From, S) -> + {reply, {Reply, LC2}, NewS}; +handle_call({{proj_read_latest}, LC1}, _From, S) -> + LC2 = lclock_update(LC1), case do_proj_get_latest_num(S) of {error_unwritten, _S} -> - {reply, error_unwritten, S}; + {reply, {error_unwritten, LC2}, S}; {{ok, ProjNum}, _S} -> Proj = orddict:fetch(ProjNum, S#state.proj_store), - {reply, {ok, Proj}, S} + {reply, {{ok, Proj}, LC2}, S} end; -handle_call(stop, _From, MLP) -> - {stop, normal, ok, MLP}; +handle_call({stop, LC1}, _From, MLP) -> + LC2 = lclock_update(LC1), + {stop, normal, {ok, LC2}, MLP}; handle_call(_Request, _From, MLP) -> Reply = whaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa, {reply, Reply, MLP}. diff --git a/prototype/poc-machi/test/machi_flu0_test.erl b/prototype/poc-machi/test/machi_flu0_test.erl index 3456fa8..7386168 100644 --- a/prototype/poc-machi/test/machi_flu0_test.erl +++ b/prototype/poc-machi/test/machi_flu0_test.erl @@ -30,22 +30,23 @@ concuerror3_test() -> ok. concuerror4_test() -> + event_setup(), {ok, F1} = machi_flu0:start_link("one"), ProjNum = 1, - ok = machi_flu0:proj_write(F1, ProjNum, dontcare), + ok = m_proj_write(F1, ProjNum, dontcare), Val = <<"val!">>, - ok = machi_flu0:write(F1, ProjNum, Val), - {error_stale_projection, ProjNum} = machi_flu0:write(F1, ProjNum - 1, Val), + ok = m_write(F1, ProjNum, Val), + {error_stale_projection, ProjNum} = m_write(F1, ProjNum - 1, Val), Me = self(), - TrimFun = fun() -> Res = machi_flu0:trim(F1, ProjNum), + TrimFun = fun() -> Res = m_trim(F1, ProjNum), Me ! {self(), Res} end, TrimPids = [spawn(TrimFun), spawn(TrimFun), spawn(TrimFun)], TrimExpected = [error_trimmed,error_trimmed,ok], - GetFun = fun() -> Res = machi_flu0:read(F1, ProjNum), + GetFun = fun() -> Res = m_read(F1, ProjNum), Me ! {self(), Res} end, GetPids = [spawn(GetFun)], @@ -63,72 +64,108 @@ concuerror4_test() -> end || GetPid <- GetPids]), ok = GetExpected(GetResults), - ok = machi_flu0:stop(F1), + ok = m_stop(F1), ok. proj_store_test() -> + event_setup(), {ok, F1} = machi_flu0:start_link("one"), - error_unwritten = machi_flu0:proj_get_latest_num(F1), - error_unwritten = machi_flu0:proj_read_latest(F1), + error_unwritten = m_proj_get_latest_num(F1), + error_unwritten = m_proj_read_latest(F1), Proj1 = whatever1, - ok = machi_flu0:proj_write(F1, 1, Proj1), - error_written = machi_flu0:proj_write(F1, 1, Proj1), - {ok, Proj1} = machi_flu0:proj_read(F1, 1), - {ok, 1} = machi_flu0:proj_get_latest_num(F1), - {ok, Proj1} = machi_flu0:proj_read_latest(F1), + ok = m_proj_write(F1, 1, Proj1), + error_written = m_proj_write(F1, 1, Proj1), + {ok, Proj1} = m_proj_read(F1, 1), + {ok, 1} = m_proj_get_latest_num(F1), + {ok, Proj1} = m_proj_read_latest(F1), - ok = machi_flu0:stop(F1), + ok = m_stop(F1), ok. wedge_test() -> event_setup(), {ok, F1} = machi_flu0:start_link("one"), ProjNum1 = 1, - ok = machi_flu0:proj_write(F1, ProjNum1, dontcare), + ok = m_proj_write(F1, ProjNum1, dontcare), Val = <<"val!">>, - ok = machi_flu0:write(F1, ProjNum1, Val), - {error_stale_projection, ProjNum1} = machi_flu0:write(F1, ProjNum1 - 1, Val), - error_wedged = machi_flu0:write(F1, ProjNum1 + 1, Val), + ok = m_write(F1, ProjNum1, Val), + {error_stale_projection, ProjNum1} = m_write(F1, ProjNum1 - 1, Val), + error_wedged = m_write(F1, ProjNum1 + 1, Val), %% Until we write a newer/bigger projection, all ops are error_wedged - error_wedged = read(F1, ProjNum1), - error_wedged = machi_flu0:write(F1, ProjNum1, Val), - error_wedged = machi_flu0:trim(F1, ProjNum1), + error_wedged = m_read(F1, ProjNum1), + error_wedged = m_write(F1, ProjNum1, Val), + error_wedged = m_trim(F1, ProjNum1), ProjNum2 = ProjNum1 + 1, - ok = machi_flu0:proj_write(F1, ProjNum2, dontcare), - {ok, Val} = read(F1, ProjNum2), - error_written = machi_flu0:write(F1, ProjNum2, Val), - ok = machi_flu0:trim(F1, ProjNum2), - error_trimmed = machi_flu0:trim(F1, ProjNum2), + ok = m_proj_write(F1, ProjNum2, dontcare), + {ok, Val} = m_read(F1, ProjNum2), + error_written = m_write(F1, ProjNum2, Val), + ok = m_trim(F1, ProjNum2), + error_trimmed = m_trim(F1, ProjNum2), - ok = machi_flu0:stop(F1), - XX = event_get_all(), io:format(user, "XX ~p\n", [XX]), + ok = m_stop(F1), + _XX = event_get_all(), io:format(user, "XX ~p\n", [_XX]), event_shutdown(), ok. %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% -read(Pid, ProjNum) -> +m_write(Pid, ProjNum1, Val) -> + Res = machi_flu0:write(Pid, ProjNum1, Val), + event_add(write, Pid, Res), + Res. + +m_read(Pid, ProjNum) -> Res = machi_flu0:read(Pid, ProjNum), - event_add(get, Res), + event_add(get, Pid, Res), + Res. + +m_trim(Pid, ProjNum) -> + Res = machi_flu0:trim(Pid, ProjNum), + event_add(trim, Pid, Res), + Res. + +m_stop(Pid) -> + Res = machi_flu0:stop(Pid), + event_add(stop, Pid, Res), + Res. + +m_proj_write(Pid, ProjNum, Proj) -> + Res = machi_flu0:proj_write(Pid, ProjNum, Proj), + event_add(proj_write, Pid, Res), + Res. + +m_proj_read(Pid, ProjNum) -> + Res = machi_flu0:proj_read(Pid, ProjNum), + event_add(proj_read, Pid, Res), + Res. + +m_proj_get_latest_num(Pid) -> + Res = machi_flu0:proj_get_latest_num(Pid), + event_add(proj_get_latest_num, Pid, Res), + Res. + +m_proj_read_latest(Pid) -> + Res = machi_flu0:proj_read_latest(Pid), + event_add(proj_read_latest, Pid, Res), Res. event_setup() -> Tab = ?MODULE, ok = event_shutdown(), - ets:new(Tab, [named_table, ordered_set]). + ets:new(Tab, [named_table, ordered_set, public]). event_shutdown() -> Tab = ?MODULE, (catch ets:delete(Tab)), ok. -event_add(Key, Description) -> +event_add(Key, Who, Description) -> Tab = ?MODULE, - ets:insert(Tab, {lamport_clock:get(), Key, Description}). + ets:insert(Tab, {lamport_clock:get(), Key, Who, Description}). event_get_all() -> Tab = ?MODULE,