WIP: enforce wedging and new projection writes

This commit is contained in:
Scott Lystig Fritchie 2014-09-28 16:03:02 +09:00
parent bebce51ab9
commit 921d90a69b
2 changed files with 96 additions and 37 deletions

View file

@ -3,7 +3,7 @@
-behaviour(gen_server). -behaviour(gen_server).
-export([start_link/1, stop/1, -export([start_link/1, stop/1,
write/2, get/1, trim/1, write/3, get/2, trim/2,
proj_write/3, proj_read/2, proj_get_latest_num/1, proj_read_latest/1]). proj_write/3, proj_read/2, proj_get_latest_num/1, proj_read_latest/1]).
-ifdef(TEST). -ifdef(TEST).
-compile(export_all). -compile(export_all).
@ -29,7 +29,9 @@
-record(state, { -record(state, {
name :: list(), name :: list(),
wedged = false :: boolean(),
register = 'unwritten' :: register(), register = 'unwritten' :: register(),
proj_num :: non_neg_integer(),
proj_store :: dict() proj_store :: dict()
}). }).
@ -39,20 +41,20 @@ start_link(Name) when is_list(Name) ->
stop(Pid) -> stop(Pid) ->
gen_server:call(Pid, stop, infinity). gen_server:call(Pid, stop, infinity).
get(Pid) -> get(Pid, ProjNum) ->
gen_server:call(Pid, {get}, ?LONG_TIME). gen_server:call(Pid, {reg_op, ProjNum, get}, ?LONG_TIME).
write(Pid, Bin) when is_binary(Bin) -> write(Pid, ProjNum, Bin) when is_binary(Bin) ->
gen_server:call(Pid, {write, Bin}, ?LONG_TIME). gen_server:call(Pid, {reg_op, ProjNum, {write, Bin}}, ?LONG_TIME).
trim(Pid) -> trim(Pid, ProjNum) ->
gen_server:call(Pid, {trim}, ?LONG_TIME). gen_server:call(Pid, {reg_op, ProjNum, trim}, ?LONG_TIME).
proj_write(Pid, Num, Proj) -> proj_write(Pid, ProjNum, Proj) ->
gen_server:call(Pid, {proj_write, Num, Proj}, ?LONG_TIME). gen_server:call(Pid, {proj_write, ProjNum, Proj}, ?LONG_TIME).
proj_read(Pid, Num) -> proj_read(Pid, ProjNum) ->
gen_server:call(Pid, {proj_read, Num}, ?LONG_TIME). gen_server:call(Pid, {proj_read, ProjNum}, ?LONG_TIME).
proj_get_latest_num(Pid) -> proj_get_latest_num(Pid) ->
gen_server:call(Pid, {proj_get_latest_num}, ?LONG_TIME). gen_server:call(Pid, {proj_get_latest_num}, ?LONG_TIME).
@ -63,27 +65,47 @@ proj_read_latest(Pid) ->
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
init([Name]) -> init([Name]) ->
{ok, #state{name=Name, proj_store=orddict:new()}}. {ok, #state{name=Name,
proj_num=-42,
proj_store=orddict:new()}}.
handle_call({write, Bin}, _From, #state{register=unwritten} = S) -> handle_call({reg_op, _ProjNum, _}, _From, #state{wedged=true} = S) ->
{reply, error_wedged, S};
handle_call({reg_op, ProjNum, _}, _From, #state{proj_num=MyProjNum} = S)
when ProjNum < MyProjNum ->
{reply, {error_stale_projection, MyProjNum}, S};
handle_call({reg_op, ProjNum, _}, _From, #state{proj_num=MyProjNum} = S)
when ProjNum > MyProjNum ->
{reply, error_wedged, S#state{wedged=true}};
handle_call({reg_op, _ProjNum, {write, Bin}}, _From,
#state{register=unwritten} = S) ->
{reply, ok, S#state{register=Bin}}; {reply, ok, S#state{register=Bin}};
handle_call({write, _Bin}, _From, #state{register=B} = S) when is_binary(B) -> handle_call({reg_op, _ProjNum, {write, _Bin}}, _From,
#state{register=B} = S) when is_binary(B) ->
{reply, error_written, S}; {reply, error_written, S};
handle_call({write, _Bin}, _From, #state{register=trimmed} = S) -> handle_call({reg_op, _ProjNum, {write, _Bin}}, _From,
#state{register=trimmed} = S) ->
{reply, error_trimmed, S}; {reply, error_trimmed, S};
handle_call({get}, _From, #state{register=Reg} = S) ->
handle_call({reg_op, ProjNum, get}, _From, #state{proj_num=MyProjNum} = S)
when ProjNum /= MyProjNum ->
{reply, {error_stale_projection, MyProjNum}, S};
handle_call({reg_op, _ProjNum, get}, _From, #state{register=Reg} = S) ->
{reply, {ok, Reg}, S}; {reply, {ok, Reg}, S};
handle_call({trim}, _From, #state{register=unwritten} = S) ->
handle_call({reg_op, _ProjNum, trim}, _From, #state{register=unwritten} = S) ->
{reply, ok, S#state{register=trimmed}}; {reply, ok, S#state{register=trimmed}};
handle_call({trim}, _From, #state{register=B} = S) when is_binary(B) -> handle_call({reg_op, _ProjNum, trim}, _From, #state{register=B} = S) when is_binary(B) ->
{reply, ok, S#state{register=trimmed}}; {reply, ok, S#state{register=trimmed}};
handle_call({trim}, _From, #state{register=trimmed} = S) -> handle_call({reg_op, _ProjNum, trim}, _From, #state{register=trimmed} = S) ->
{reply, error_trimmed, S}; {reply, error_trimmed, S};
handle_call({proj_write, Num, Proj}, _From, S) ->
{Reply, NewS} = do_proj_write(Num, Proj, S), handle_call({proj_write, ProjNum, Proj}, _From, S) ->
{Reply, NewS} = do_proj_write(ProjNum, Proj, S),
{reply, Reply, NewS}; {reply, Reply, NewS};
handle_call({proj_read, Num}, _From, S) -> handle_call({proj_read, ProjNum}, _From, S) ->
{Reply, NewS} = do_proj_read(Num, S), {Reply, NewS} = do_proj_read(ProjNum, S),
{reply, Reply, NewS}; {reply, Reply, NewS};
handle_call({proj_get_latest_num}, _From, S) -> handle_call({proj_get_latest_num}, _From, S) ->
{Reply, NewS} = do_proj_get_latest_num(S), {Reply, NewS} = do_proj_get_latest_num(S),
@ -92,8 +114,8 @@ handle_call({proj_read_latest}, _From, S) ->
case do_proj_get_latest_num(S) of case do_proj_get_latest_num(S) of
{error_unwritten, _S} -> {error_unwritten, _S} ->
{reply, error_unwritten, S}; {reply, error_unwritten, S};
{{ok, Num}, _S} -> {{ok, ProjNum}, _S} ->
Proj = orddict:fetch(Num, S#state.proj_store), Proj = orddict:fetch(ProjNum, S#state.proj_store),
{reply, {ok, Proj}, S} {reply, {ok, Proj}, S}
end; end;
handle_call(stop, _From, MLP) -> handle_call(stop, _From, MLP) ->
@ -116,17 +138,25 @@ code_change(_OldVsn, MLP, _Extra) ->
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
do_proj_write(Num, Proj, #state{proj_store=D} = S) -> do_proj_write(ProjNum, Proj, #state{proj_num=MyProjNum, proj_store=D,
case orddict:find(Num, D) of wedged=MyWedged} = S) ->
case orddict:find(ProjNum, D) of
error -> error ->
D2 = orddict:store(Num, Proj, D), D2 = orddict:store(ProjNum, Proj, D),
{ok, S#state{proj_store=D2}}; {NewProjNum, NewWedged} = if ProjNum > MyProjNum ->
{ProjNum, false};
true ->
{MyProjNum, MyWedged}
end,
{ok, S#state{wedged=NewWedged,
proj_num=NewProjNum,
proj_store=D2}};
{ok, _} -> {ok, _} ->
{error_written, S} {error_written, S}
end. end.
do_proj_read(Num, #state{proj_store=D} = S) -> do_proj_read(ProjNum, #state{proj_store=D} = S) ->
case orddict:find(Num, D) of case orddict:find(ProjNum, D) of
error -> error ->
{error_unwritten, S}; {error_unwritten, S};
{ok, Proj} -> {ok, Proj} ->
@ -138,6 +168,6 @@ do_proj_get_latest_num(#state{proj_store=D} = S) ->
[] -> [] ->
{error_unwritten, S}; {error_unwritten, S};
L -> L ->
{Num, _Proj} = lists:last(L), {ProjNum, _Proj} = lists:last(L),
{{ok, Num}, S} {{ok, ProjNum}, S}
end. end.

View file

@ -31,16 +31,21 @@ concuerror3_test() ->
concuerror4_test() -> concuerror4_test() ->
{ok, F1} = machi_flu0:start_link("one"), {ok, F1} = machi_flu0:start_link("one"),
ProjNum = 1,
ok = machi_flu0:proj_write(F1, ProjNum, dontcare),
Val = <<"val!">>, Val = <<"val!">>,
ok = machi_flu0:write(F1, Val), ok = machi_flu0:write(F1, ProjNum, Val),
{error_stale_projection, ProjNum} = machi_flu0:write(F1, ProjNum - 1, Val),
Me = self(), Me = self(),
TrimFun = fun() -> Res = machi_flu0:trim(F1), TrimFun = fun() -> Res = machi_flu0:trim(F1, ProjNum),
Me ! {self(), Res} Me ! {self(), Res}
end, end,
TrimPids = [spawn(TrimFun), spawn(TrimFun), spawn(TrimFun)], TrimPids = [spawn(TrimFun), spawn(TrimFun), spawn(TrimFun)],
TrimExpected = [error_trimmed,error_trimmed,ok], TrimExpected = [error_trimmed,error_trimmed,ok],
GetFun = fun() -> Res = machi_flu0:get(F1), GetFun = fun() -> Res = machi_flu0:get(F1, ProjNum),
Me ! {self(), Res} Me ! {self(), Res}
end, end,
GetPids = [spawn(GetFun)], GetPids = [spawn(GetFun)],
@ -77,5 +82,29 @@ proj_store_test() ->
ok = machi_flu0:stop(F1), ok = machi_flu0:stop(F1),
ok. ok.
wedge_test() ->
{ok, F1} = machi_flu0:start_link("one"),
ProjNum1 = 1,
ok = machi_flu0:proj_write(F1, ProjNum1, dontcare),
Val = <<"val!">>,
ok = machi_flu0:write(F1, ProjNum1, Val),
{error_stale_projection, ProjNum1} = machi_flu0:write(F1, ProjNum1 - 1, Val),
error_wedged = machi_flu0:write(F1, ProjNum1 + 1, Val),
%% Until we write a newer/bigger projection, all ops are error_wedged
error_wedged = machi_flu0:get(F1, ProjNum1),
error_wedged = machi_flu0:write(F1, ProjNum1, Val),
error_wedged = machi_flu0:trim(F1, ProjNum1),
ProjNum2 = ProjNum1 + 1,
ok = machi_flu0:proj_write(F1, ProjNum2, dontcare),
{ok, Val} = machi_flu0:get(F1, ProjNum2),
error_written = machi_flu0:write(F1, ProjNum2, Val),
ok = machi_flu0:trim(F1, ProjNum2),
error_trimmed = machi_flu0:trim(F1, ProjNum2),
ok = machi_flu0:stop(F1),
ok.
-endif. -endif.
-endif. -endif.