diff --git a/prototype/poc-machi/src/machi_flu0.erl b/prototype/poc-machi/src/machi_flu0.erl index e4704fe..586b622 100644 --- a/prototype/poc-machi/src/machi_flu0.erl +++ b/prototype/poc-machi/src/machi_flu0.erl @@ -3,7 +3,7 @@ -behaviour(gen_server). -export([start_link/1, stop/1, - write/2, get/1, trim/1, + write/3, get/2, trim/2, proj_write/3, proj_read/2, proj_get_latest_num/1, proj_read_latest/1]). -ifdef(TEST). -compile(export_all). @@ -29,7 +29,9 @@ -record(state, { name :: list(), + wedged = false :: boolean(), register = 'unwritten' :: register(), + proj_num :: non_neg_integer(), proj_store :: dict() }). @@ -39,20 +41,20 @@ start_link(Name) when is_list(Name) -> stop(Pid) -> gen_server:call(Pid, stop, infinity). -get(Pid) -> - gen_server:call(Pid, {get}, ?LONG_TIME). +get(Pid, ProjNum) -> + gen_server:call(Pid, {reg_op, ProjNum, get}, ?LONG_TIME). -write(Pid, Bin) when is_binary(Bin) -> - gen_server:call(Pid, {write, Bin}, ?LONG_TIME). +write(Pid, ProjNum, Bin) when is_binary(Bin) -> + gen_server:call(Pid, {reg_op, ProjNum, {write, Bin}}, ?LONG_TIME). -trim(Pid) -> - gen_server:call(Pid, {trim}, ?LONG_TIME). +trim(Pid, ProjNum) -> + gen_server:call(Pid, {reg_op, ProjNum, trim}, ?LONG_TIME). -proj_write(Pid, Num, Proj) -> - gen_server:call(Pid, {proj_write, Num, Proj}, ?LONG_TIME). +proj_write(Pid, ProjNum, Proj) -> + gen_server:call(Pid, {proj_write, ProjNum, Proj}, ?LONG_TIME). -proj_read(Pid, Num) -> - gen_server:call(Pid, {proj_read, Num}, ?LONG_TIME). +proj_read(Pid, ProjNum) -> + gen_server:call(Pid, {proj_read, ProjNum}, ?LONG_TIME). proj_get_latest_num(Pid) -> gen_server:call(Pid, {proj_get_latest_num}, ?LONG_TIME). @@ -63,27 +65,47 @@ proj_read_latest(Pid) -> %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% init([Name]) -> - {ok, #state{name=Name, proj_store=orddict:new()}}. + {ok, #state{name=Name, + proj_num=-42, + proj_store=orddict:new()}}. -handle_call({write, Bin}, _From, #state{register=unwritten} = S) -> +handle_call({reg_op, _ProjNum, _}, _From, #state{wedged=true} = S) -> + {reply, error_wedged, S}; +handle_call({reg_op, ProjNum, _}, _From, #state{proj_num=MyProjNum} = S) + when ProjNum < MyProjNum -> + {reply, {error_stale_projection, MyProjNum}, S}; +handle_call({reg_op, ProjNum, _}, _From, #state{proj_num=MyProjNum} = S) + when ProjNum > MyProjNum -> + {reply, error_wedged, S#state{wedged=true}}; + +handle_call({reg_op, _ProjNum, {write, Bin}}, _From, + #state{register=unwritten} = S) -> {reply, ok, S#state{register=Bin}}; -handle_call({write, _Bin}, _From, #state{register=B} = S) when is_binary(B) -> +handle_call({reg_op, _ProjNum, {write, _Bin}}, _From, + #state{register=B} = S) when is_binary(B) -> {reply, error_written, S}; -handle_call({write, _Bin}, _From, #state{register=trimmed} = S) -> +handle_call({reg_op, _ProjNum, {write, _Bin}}, _From, + #state{register=trimmed} = S) -> {reply, error_trimmed, S}; -handle_call({get}, _From, #state{register=Reg} = S) -> + +handle_call({reg_op, ProjNum, get}, _From, #state{proj_num=MyProjNum} = S) + when ProjNum /= MyProjNum -> + {reply, {error_stale_projection, MyProjNum}, S}; +handle_call({reg_op, _ProjNum, get}, _From, #state{register=Reg} = S) -> {reply, {ok, Reg}, S}; -handle_call({trim}, _From, #state{register=unwritten} = S) -> + +handle_call({reg_op, _ProjNum, trim}, _From, #state{register=unwritten} = S) -> {reply, ok, S#state{register=trimmed}}; -handle_call({trim}, _From, #state{register=B} = S) when is_binary(B) -> +handle_call({reg_op, _ProjNum, trim}, _From, #state{register=B} = S) when is_binary(B) -> {reply, ok, S#state{register=trimmed}}; -handle_call({trim}, _From, #state{register=trimmed} = S) -> +handle_call({reg_op, _ProjNum, trim}, _From, #state{register=trimmed} = S) -> {reply, error_trimmed, S}; -handle_call({proj_write, Num, Proj}, _From, S) -> - {Reply, NewS} = do_proj_write(Num, Proj, S), + +handle_call({proj_write, ProjNum, Proj}, _From, S) -> + {Reply, NewS} = do_proj_write(ProjNum, Proj, S), {reply, Reply, NewS}; -handle_call({proj_read, Num}, _From, S) -> - {Reply, NewS} = do_proj_read(Num, S), +handle_call({proj_read, ProjNum}, _From, S) -> + {Reply, NewS} = do_proj_read(ProjNum, S), {reply, Reply, NewS}; handle_call({proj_get_latest_num}, _From, S) -> {Reply, NewS} = do_proj_get_latest_num(S), @@ -92,8 +114,8 @@ handle_call({proj_read_latest}, _From, S) -> case do_proj_get_latest_num(S) of {error_unwritten, _S} -> {reply, error_unwritten, S}; - {{ok, Num}, _S} -> - Proj = orddict:fetch(Num, S#state.proj_store), + {{ok, ProjNum}, _S} -> + Proj = orddict:fetch(ProjNum, S#state.proj_store), {reply, {ok, Proj}, S} end; handle_call(stop, _From, MLP) -> @@ -116,17 +138,25 @@ code_change(_OldVsn, MLP, _Extra) -> %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% -do_proj_write(Num, Proj, #state{proj_store=D} = S) -> - case orddict:find(Num, D) of +do_proj_write(ProjNum, Proj, #state{proj_num=MyProjNum, proj_store=D, + wedged=MyWedged} = S) -> + case orddict:find(ProjNum, D) of error -> - D2 = orddict:store(Num, Proj, D), - {ok, S#state{proj_store=D2}}; + D2 = orddict:store(ProjNum, Proj, D), + {NewProjNum, NewWedged} = if ProjNum > MyProjNum -> + {ProjNum, false}; + true -> + {MyProjNum, MyWedged} + end, + {ok, S#state{wedged=NewWedged, + proj_num=NewProjNum, + proj_store=D2}}; {ok, _} -> {error_written, S} end. -do_proj_read(Num, #state{proj_store=D} = S) -> - case orddict:find(Num, D) of +do_proj_read(ProjNum, #state{proj_store=D} = S) -> + case orddict:find(ProjNum, D) of error -> {error_unwritten, S}; {ok, Proj} -> @@ -138,6 +168,6 @@ do_proj_get_latest_num(#state{proj_store=D} = S) -> [] -> {error_unwritten, S}; L -> - {Num, _Proj} = lists:last(L), - {{ok, Num}, S} + {ProjNum, _Proj} = lists:last(L), + {{ok, ProjNum}, S} end. diff --git a/prototype/poc-machi/test/machi_flu0_test.erl b/prototype/poc-machi/test/machi_flu0_test.erl index 09d4d1e..24d3b9e 100644 --- a/prototype/poc-machi/test/machi_flu0_test.erl +++ b/prototype/poc-machi/test/machi_flu0_test.erl @@ -31,16 +31,21 @@ concuerror3_test() -> concuerror4_test() -> {ok, F1} = machi_flu0:start_link("one"), + ProjNum = 1, + ok = machi_flu0:proj_write(F1, ProjNum, dontcare), + Val = <<"val!">>, - ok = machi_flu0:write(F1, Val), + ok = machi_flu0:write(F1, ProjNum, Val), + {error_stale_projection, ProjNum} = machi_flu0:write(F1, ProjNum - 1, Val), + Me = self(), - TrimFun = fun() -> Res = machi_flu0:trim(F1), + TrimFun = fun() -> Res = machi_flu0:trim(F1, ProjNum), Me ! {self(), Res} end, - TrimPids = [spawn(TrimFun), spawn(TrimFun),spawn(TrimFun)], + TrimPids = [spawn(TrimFun), spawn(TrimFun), spawn(TrimFun)], TrimExpected = [error_trimmed,error_trimmed,ok], - GetFun = fun() -> Res = machi_flu0:get(F1), + GetFun = fun() -> Res = machi_flu0:get(F1, ProjNum), Me ! {self(), Res} end, GetPids = [spawn(GetFun)], @@ -77,5 +82,29 @@ proj_store_test() -> ok = machi_flu0:stop(F1), ok. +wedge_test() -> + {ok, F1} = machi_flu0:start_link("one"), + ProjNum1 = 1, + ok = machi_flu0:proj_write(F1, ProjNum1, dontcare), + + Val = <<"val!">>, + ok = machi_flu0:write(F1, ProjNum1, Val), + {error_stale_projection, ProjNum1} = machi_flu0:write(F1, ProjNum1 - 1, Val), + error_wedged = machi_flu0:write(F1, ProjNum1 + 1, Val), + %% Until we write a newer/bigger projection, all ops are error_wedged + error_wedged = machi_flu0:get(F1, ProjNum1), + error_wedged = machi_flu0:write(F1, ProjNum1, Val), + error_wedged = machi_flu0:trim(F1, ProjNum1), + + ProjNum2 = ProjNum1 + 1, + ok = machi_flu0:proj_write(F1, ProjNum2, dontcare), + {ok, Val} = machi_flu0:get(F1, ProjNum2), + error_written = machi_flu0:write(F1, ProjNum2, Val), + ok = machi_flu0:trim(F1, ProjNum2), + error_trimmed = machi_flu0:trim(F1, ProjNum2), + + ok = machi_flu0:stop(F1), + ok. + -endif. -endif.