diff --git a/prototype/poc-machi/include/machi.hrl b/prototype/poc-machi/include/machi.hrl new file mode 100644 index 0000000..6b0797f --- /dev/null +++ b/prototype/poc-machi/include/machi.hrl @@ -0,0 +1,6 @@ +-record(proj, { % Projection + epoch :: non_neg_integer(), + all :: list(pid()), + active :: list(pid()) + }). + diff --git a/prototype/poc-machi/src/machi_flu0.erl b/prototype/poc-machi/src/machi_flu0.erl index 1fb3a2c..3c27cef 100644 --- a/prototype/poc-machi/src/machi_flu0.erl +++ b/prototype/poc-machi/src/machi_flu0.erl @@ -2,9 +2,13 @@ -behaviour(gen_server). +-include("machi.hrl"). + -export([start_link/1, stop/1, write/3, read/2, trim/2, proj_write/3, proj_read/2, proj_get_latest_num/1, proj_read_latest/1]). +-export([make_proj/1, make_proj/2]). + -ifdef(TEST). -compile(export_all). -endif. @@ -31,7 +35,7 @@ name :: list(), wedged = false :: boolean(), register = 'unwritten' :: register(), - proj_num :: non_neg_integer(), + proj_epoch :: non_neg_integer(), proj_store :: dict() }). @@ -41,20 +45,20 @@ start_link(Name) when is_list(Name) -> stop(Pid) -> g_call(Pid, stop, infinity). -read(Pid, ProjNum) -> - g_call(Pid, {reg_op, ProjNum, read}, ?LONG_TIME). +read(Pid, Epoch) -> + g_call(Pid, {reg_op, Epoch, read}, ?LONG_TIME). -write(Pid, ProjNum, Bin) when is_binary(Bin) -> - g_call(Pid, {reg_op, ProjNum, {write, Bin}}, ?LONG_TIME). +write(Pid, Epoch, Bin) -> + g_call(Pid, {reg_op, Epoch, {write, Bin}}, ?LONG_TIME). -trim(Pid, ProjNum) -> - g_call(Pid, {reg_op, ProjNum, trim}, ?LONG_TIME). +trim(Pid, Epoch) -> + g_call(Pid, {reg_op, Epoch, trim}, ?LONG_TIME). -proj_write(Pid, ProjNum, Proj) -> - g_call(Pid, {proj_write, ProjNum, Proj}, ?LONG_TIME). +proj_write(Pid, Epoch, Proj) -> + g_call(Pid, {proj_write, Epoch, Proj}, ?LONG_TIME). -proj_read(Pid, ProjNum) -> - g_call(Pid, {proj_read, ProjNum}, ?LONG_TIME). +proj_read(Pid, Epoch) -> + g_call(Pid, {proj_read, Epoch}, ?LONG_TIME). proj_get_latest_num(Pid) -> g_call(Pid, {proj_get_latest_num}, ?LONG_TIME). @@ -70,62 +74,70 @@ g_call(Pid, Arg, Timeout) -> %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% +make_proj(FLUs) -> + make_proj(1, FLUs). + +make_proj(Epoch, FLUs) -> + #proj{epoch=Epoch, all=FLUs, active=FLUs}. + +%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% + init([Name]) -> lclock_init(), {ok, #state{name=Name, - proj_num=-42, + proj_epoch=-42, proj_store=orddict:new()}}. -handle_call({{reg_op, _ProjNum, _}, LC1}, _From, #state{wedged=true} = S) -> +handle_call({{reg_op, _Epoch, _}, LC1}, _From, #state{wedged=true} = S) -> LC2 = lclock_update(LC1), {reply, {error_wedged, LC2}, S}; -handle_call({{reg_op, ProjNum, _}, LC1}, _From, #state{proj_num=MyProjNum} = S) - when ProjNum < MyProjNum -> +handle_call({{reg_op, Epoch, _}, LC1}, _From, #state{proj_epoch=MyEpoch} = S) + when Epoch < MyEpoch -> LC2 = lclock_update(LC1), - {reply, {{error_stale_projection, MyProjNum}, LC2}, S}; -handle_call({{reg_op, ProjNum, _}, LC1}, _From, #state{proj_num=MyProjNum} = S) - when ProjNum > MyProjNum -> + {reply, {{error_stale_projection, MyEpoch}, LC2}, S}; +handle_call({{reg_op, Epoch, _}, LC1}, _From, #state{proj_epoch=MyEpoch} = S) + when Epoch > MyEpoch -> LC2 = lclock_update(LC1), {reply, {error_wedged, LC2}, S#state{wedged=true}}; -handle_call({{reg_op, _ProjNum, {write, Bin}}, LC1}, _From, +handle_call({{reg_op, _Epoch, {write, Bin}}, LC1}, _From, #state{register=unwritten} = S) -> LC2 = lclock_update(LC1), {reply, {ok, LC2}, S#state{register=Bin}}; -handle_call({{reg_op, _ProjNum, {write, _Bin}}, LC1}, _From, +handle_call({{reg_op, _Epoch, {write, _Bin}}, LC1}, _From, #state{register=B} = S) when is_binary(B) -> LC2 = lclock_update(LC1), {reply, {error_written, LC2}, S}; -handle_call({{reg_op, _ProjNum, {write, _Bin}}, LC1}, _From, +handle_call({{reg_op, _Epoch, {write, _Bin}}, LC1}, _From, #state{register=trimmed} = S) -> LC2 = lclock_update(LC1), {reply, {error_trimmed, LC2}, S}; -handle_call({{reg_op, ProjNum, read}, LC1}, _From, #state{proj_num=MyProjNum} = S) - when ProjNum /= MyProjNum -> +handle_call({{reg_op, Epoch, read}, LC1}, _From, #state{proj_epoch=MyEpoch} = S) + when Epoch /= MyEpoch -> LC2 = lclock_update(LC1), - {reply, {{error_stale_projection, MyProjNum}, LC2}, S}; -handle_call({{reg_op, _ProjNum, read}, LC1}, _From, #state{register=Reg} = S) -> + {reply, {{error_stale_projection, MyEpoch}, LC2}, S}; +handle_call({{reg_op, _Epoch, read}, LC1}, _From, #state{register=Reg} = S) -> LC2 = lclock_update(LC1), {reply, {{ok, Reg}, LC2}, S}; -handle_call({{reg_op, _ProjNum, trim}, LC1}, _From, #state{register=unwritten} = S) -> +handle_call({{reg_op, _Epoch, trim}, LC1}, _From, #state{register=unwritten} = S) -> LC2 = lclock_update(LC1), {reply, {ok, LC2}, S#state{register=trimmed}}; -handle_call({{reg_op, _ProjNum, trim}, LC1}, _From, #state{register=B} = S) when is_binary(B) -> +handle_call({{reg_op, _Epoch, trim}, LC1}, _From, #state{register=B} = S) when is_binary(B) -> LC2 = lclock_update(LC1), {reply, {ok, LC2}, S#state{register=trimmed}}; -handle_call({{reg_op, _ProjNum, trim}, LC1}, _From, #state{register=trimmed} = S) -> +handle_call({{reg_op, _Epoch, trim}, LC1}, _From, #state{register=trimmed} = S) -> LC2 = lclock_update(LC1), {reply, {error_trimmed, LC2}, S}; -handle_call({{proj_write, ProjNum, Proj}, LC1}, _From, S) -> +handle_call({{proj_write, Epoch, Proj}, LC1}, _From, S) -> LC2 = lclock_update(LC1), - {Reply, NewS} = do_proj_write(ProjNum, Proj, S), + {Reply, NewS} = do_proj_write(Epoch, Proj, S), {reply, {Reply, LC2}, NewS}; -handle_call({{proj_read, ProjNum}, LC1}, _From, S) -> +handle_call({{proj_read, Epoch}, LC1}, _From, S) -> LC2 = lclock_update(LC1), - {Reply, NewS} = do_proj_read(ProjNum, S), + {Reply, NewS} = do_proj_read(Epoch, S), {reply, {Reply, LC2}, NewS}; handle_call({{proj_get_latest_num}, LC1}, _From, S) -> LC2 = lclock_update(LC1), @@ -136,8 +148,8 @@ handle_call({{proj_read_latest}, LC1}, _From, S) -> case do_proj_get_latest_num(S) of {error_unwritten, _S} -> {reply, {error_unwritten, LC2}, S}; - {{ok, ProjNum}, _S} -> - Proj = orddict:fetch(ProjNum, S#state.proj_store), + {{ok, Epoch}, _S} -> + Proj = orddict:fetch(Epoch, S#state.proj_store), {reply, {{ok, Proj}, LC2}, S} end; handle_call({stop, LC1}, _From, MLP) -> @@ -161,25 +173,25 @@ code_change(_OldVsn, MLP, _Extra) -> %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% -do_proj_write(ProjNum, Proj, #state{proj_num=MyProjNum, proj_store=D, +do_proj_write(Epoch, Proj, #state{proj_epoch=MyEpoch, proj_store=D, wedged=MyWedged} = S) -> - case orddict:find(ProjNum, D) of + case orddict:find(Epoch, D) of error -> - D2 = orddict:store(ProjNum, Proj, D), - {NewProjNum, NewWedged} = if ProjNum > MyProjNum -> - {ProjNum, false}; + D2 = orddict:store(Epoch, Proj, D), + {NewEpoch, NewWedged} = if Epoch > MyEpoch -> + {Epoch, false}; true -> - {MyProjNum, MyWedged} + {MyEpoch, MyWedged} end, {ok, S#state{wedged=NewWedged, - proj_num=NewProjNum, + proj_epoch=NewEpoch, proj_store=D2}}; {ok, _} -> {error_written, S} end. -do_proj_read(ProjNum, #state{proj_store=D} = S) -> - case orddict:find(ProjNum, D) of +do_proj_read(Epoch, #state{proj_store=D} = S) -> + case orddict:find(Epoch, D) of error -> {error_unwritten, S}; {ok, Proj} -> @@ -191,8 +203,8 @@ do_proj_get_latest_num(#state{proj_store=D} = S) -> [] -> {error_unwritten, S}; L -> - {ProjNum, _Proj} = lists:last(L), - {{ok, ProjNum}, S} + {Epoch, _Proj} = lists:last(L), + {{ok, Epoch}, S} end. -ifdef(TEST). diff --git a/prototype/poc-machi/test/machi_flu0_test.erl b/prototype/poc-machi/test/machi_flu0_test.erl index 7386168..c1dc16a 100644 --- a/prototype/poc-machi/test/machi_flu0_test.erl +++ b/prototype/poc-machi/test/machi_flu0_test.erl @@ -1,6 +1,8 @@ -module(machi_flu0_test). +-include("machi.hrl"). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -compile(export_all). @@ -32,21 +34,21 @@ concuerror3_test() -> concuerror4_test() -> event_setup(), {ok, F1} = machi_flu0:start_link("one"), - ProjNum = 1, - ok = m_proj_write(F1, ProjNum, dontcare), + Epoch = 1, + ok = m_proj_write(F1, Epoch, dontcare), Val = <<"val!">>, - ok = m_write(F1, ProjNum, Val), - {error_stale_projection, ProjNum} = m_write(F1, ProjNum - 1, Val), + ok = m_write(F1, Epoch, Val), + {error_stale_projection, Epoch} = m_write(F1, Epoch - 1, Val), Me = self(), - TrimFun = fun() -> Res = m_trim(F1, ProjNum), + TrimFun = fun() -> Res = m_trim(F1, Epoch), Me ! {self(), Res} end, TrimPids = [spawn(TrimFun), spawn(TrimFun), spawn(TrimFun)], TrimExpected = [error_trimmed,error_trimmed,ok], - GetFun = fun() -> Res = m_read(F1, ProjNum), + GetFun = fun() -> Res = m_read(F1, Epoch), Me ! {self(), Res} end, GetPids = [spawn(GetFun)], @@ -87,44 +89,77 @@ proj_store_test() -> wedge_test() -> event_setup(), {ok, F1} = machi_flu0:start_link("one"), - ProjNum1 = 1, - ok = m_proj_write(F1, ProjNum1, dontcare), + Epoch1 = 1, + ok = m_proj_write(F1, Epoch1, dontcare), Val = <<"val!">>, - ok = m_write(F1, ProjNum1, Val), - {error_stale_projection, ProjNum1} = m_write(F1, ProjNum1 - 1, Val), - error_wedged = m_write(F1, ProjNum1 + 1, Val), + ok = m_write(F1, Epoch1, Val), + {error_stale_projection, Epoch1} = m_write(F1, Epoch1 - 1, Val), + error_wedged = m_write(F1, Epoch1 + 1, Val), %% Until we write a newer/bigger projection, all ops are error_wedged - error_wedged = m_read(F1, ProjNum1), - error_wedged = m_write(F1, ProjNum1, Val), - error_wedged = m_trim(F1, ProjNum1), + error_wedged = m_read(F1, Epoch1), + error_wedged = m_write(F1, Epoch1, Val), + error_wedged = m_trim(F1, Epoch1), - ProjNum2 = ProjNum1 + 1, - ok = m_proj_write(F1, ProjNum2, dontcare), - {ok, Val} = m_read(F1, ProjNum2), - error_written = m_write(F1, ProjNum2, Val), - ok = m_trim(F1, ProjNum2), - error_trimmed = m_trim(F1, ProjNum2), + Epoch2 = Epoch1 + 1, + ok = m_proj_write(F1, Epoch2, dontcare), + {ok, Val} = m_read(F1, Epoch2), + error_written = m_write(F1, Epoch2, Val), + ok = m_trim(F1, Epoch2), + error_trimmed = m_trim(F1, Epoch2), ok = m_stop(F1), _XX = event_get_all(), io:format(user, "XX ~p\n", [_XX]), event_shutdown(), ok. +proj0_test() -> + Me = self(), + event_setup(), + {ok, F1} = machi_flu0:start_link("one"), + {ok, F2} = machi_flu0:start_link("two"), + FLUs = [F1, F2], + FirstProj = machi_flu0:make_proj(1, FLUs), + Epoch1 = FirstProj#proj.epoch, + [ok = m_proj_write(F, Epoch1, FirstProj) || F <- FLUs], + + Proj0 = machi_flu0:make_proj(-42, FLUs), + Val = <<"val!">>, + Pid1 = spawn(fun() -> + {ok, _Proj1} = m_append_page(Proj0, Val), + Me ! {self(), done} + end), + %% Pids = [Pid1], + + SecondProj = machi_flu0:make_proj(2, FLUs), + Epoch2 = SecondProj#proj.epoch, + Pid2 = spawn(fun() -> + [ok = m_proj_write(F, Epoch2, SecondProj) || F <- FLUs], + Me ! {self(), done} + end), + Pids = [Pid1, Pid2], + + [receive {Pid, _} -> ok end || Pid <- Pids], + + [ok = m_stop(F) || F <- FLUs], + _XX = event_get_all(), %% io:format(user, "XX ~p\n", [_XX]), + event_shutdown(), + ok. + %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% -m_write(Pid, ProjNum1, Val) -> - Res = machi_flu0:write(Pid, ProjNum1, Val), +m_write(Pid, Epoch1, Val) -> + Res = machi_flu0:write(Pid, Epoch1, Val), event_add(write, Pid, Res), Res. -m_read(Pid, ProjNum) -> - Res = machi_flu0:read(Pid, ProjNum), +m_read(Pid, Epoch) -> + Res = machi_flu0:read(Pid, Epoch), event_add(get, Pid, Res), Res. -m_trim(Pid, ProjNum) -> - Res = machi_flu0:trim(Pid, ProjNum), +m_trim(Pid, Epoch) -> + Res = machi_flu0:trim(Pid, Epoch), event_add(trim, Pid, Res), Res. @@ -133,13 +168,13 @@ m_stop(Pid) -> event_add(stop, Pid, Res), Res. -m_proj_write(Pid, ProjNum, Proj) -> - Res = machi_flu0:proj_write(Pid, ProjNum, Proj), +m_proj_write(Pid, Epoch, Proj) -> + Res = machi_flu0:proj_write(Pid, Epoch, Proj), event_add(proj_write, Pid, Res), Res. -m_proj_read(Pid, ProjNum) -> - Res = machi_flu0:proj_read(Pid, ProjNum), +m_proj_read(Pid, Epoch) -> + Res = machi_flu0:proj_read(Pid, Epoch), event_add(proj_read, Pid, Res), Res. @@ -153,7 +188,114 @@ m_proj_read_latest(Pid) -> event_add(proj_read_latest, Pid, Res), Res. +m_append_page(Proj, Bytes) -> + m_append_page(Proj, Bytes, 5). + +m_append_page(Proj, _Bytes, 0) -> + {{error_failed, ?MODULE, ?LINE}, Proj}; +m_append_page(Proj, Bytes, Retries) -> + Retry = fun() -> + case poll_for_new_epoch_projection(Proj) of + {ok, NewProj} -> + m_append_page(NewProj, Bytes, Retries - 1); + Else -> + {Else, Proj} + end + end, + + case m_append_page2(Proj, Bytes) of + %% lost_race -> + %% m_append_pageQQ(Proj, Bytes, Retries - 1); + {error_stale_projection, _} -> + Retry(); + error_wedged -> + Retry(); + Else -> + {Else, Proj} + end. + +m_append_page2(#proj{epoch=Epoch, active=Active}, Bytes) -> + m_append_page3(Active, Epoch, Bytes). + +m_append_page3([], _Epoch, _Bytes) -> + ok; +m_append_page3([H|T], Epoch, Bytes) -> + Res = (catch m_write(H, Epoch, Bytes)), + case Res of + ok -> + m_append_page3(T, Epoch, Bytes); + error_unwritten -> + exit({gack, line, ?LINE}); + error_written -> + case m_read(H, Epoch) of + {ok, Present} when Present == Bytes -> + m_append_page3(T, Epoch, Bytes); + {error_stale_projection, _}=ESP -> + ESP; + Else -> + Else + end; + Else -> + Else + end. + %% L -> + %% case [x || {error_stale_projection, _} <- L] of + %% [] -> + %% UnwrittenP = lists:member(error_unwritten, L), + %% WrittenP = lists:member(error_written, L), + %% TrimmedP = lists:member(error_trimmed, L), + %% WedgedP = lists:member(error_wedged, L), + %% if UnwrittenP -> + %% error_unwritten; + %% WrittenP -> + %% error_written; + %% TrimmedP -> + %% error_trimmed; + %% WedgedP -> + %% error_wedged; + %% true -> + %% exit({gack, L}) + %% end; + %% _ -> + %% {error_stale_projection, caller_not_looking_here} + %% end + %% end. + +get_poll_retries() -> + 25. + +get_poll_sleep_time() -> + 50. + +poll_for_new_epoch_projection(P) -> + poll_for_new_epoch_projection(P, get_poll_retries()). + +poll_for_new_epoch_projection(_P, 0) -> + exit({ouch, ?MODULE, ?LINE}); +poll_for_new_epoch_projection(#proj{all=All} = P, Tries) -> + case multi_call(All, ?MODULE, m_proj_read_latest, []) of + [] -> + timer:sleep(get_poll_sleep_time()), + poll_for_new_epoch_projection(P, Tries - 1); + L -> + Answer = lists:last(lists:sort(lists:flatten(L))), + {ok, Answer} + end. + +multi_call([], _Mod, _Fun, _ArgSuffix) -> + []; +multi_call([H|T], Mod, Fun, ArgSuffix) -> + case erlang:apply(Mod,Fun, [H|ArgSuffix]) of + {ok, X} -> + [X|multi_call(T, Mod, Fun, ArgSuffix)]; + _ -> + multi_call(T, Mod, Fun, ArgSuffix) + end. + +%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% + event_setup() -> + lamport_clock:reset(), Tab = ?MODULE, ok = event_shutdown(), ets:new(Tab, [named_table, ordered_set, public]). diff --git a/prototype/poc-machi/test/pulse_util/lamport_clock.erl b/prototype/poc-machi/test/pulse_util/lamport_clock.erl index 5ec3412..d8e59b2 100644 --- a/prototype/poc-machi/test/pulse_util/lamport_clock.erl +++ b/prototype/poc-machi/test/pulse_util/lamport_clock.erl @@ -1,7 +1,7 @@ -module(lamport_clock). --export([init/0, get/0, update/1, incr/0]). +-export([init/0, reset/0, get/0, update/1, incr/0]). -define(KEY, ?MODULE). @@ -10,12 +10,15 @@ init() -> case get(?KEY) of undefined -> - FakeTOD = 0, - put(?KEY, FakeTOD + 1); + reset(); N when is_integer(N) -> ok end. +reset() -> + FakeTOD = 0, + put(?KEY, FakeTOD + 1). + get() -> init(), get(?KEY). @@ -35,6 +38,9 @@ incr() -> init() -> ok. +reset() -> + ok. + get() -> ok.