Change semantics of OID map, silly me, to match what's needed
This commit is contained in:
parent
436c6ac14b
commit
6067e26201
2 changed files with 94 additions and 28 deletions
|
@ -24,17 +24,20 @@
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/4, stop/1,
|
-export([start_link/4, stop/1,
|
||||||
put/3, get/2]).
|
new/2, get/2]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
%% Tango datatype callbacks (prototype)
|
||||||
|
-export([fresh/0,
|
||||||
|
do_pure_op/2, do_dirty_op/6, play_log_mutate_i_state/3]).
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
-define(OID_STREAM_NUMBER, 0).
|
-define(OID_STREAM_NUMBER, 0).
|
||||||
|
|
||||||
-define(LONG_TIME, 5*1000).
|
-define(LONG_TIME, 30*1000).
|
||||||
%% -define(LONG_TIME, 30*1000).
|
|
||||||
|
|
||||||
-define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])).
|
-define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])).
|
||||||
|
|
||||||
|
@ -58,8 +61,8 @@ start_link(PageSize, SequencerPid, Proj, CallbackMod) ->
|
||||||
stop(Pid) ->
|
stop(Pid) ->
|
||||||
gen_server:call(Pid, {stop}, ?LONG_TIME).
|
gen_server:call(Pid, {stop}, ?LONG_TIME).
|
||||||
|
|
||||||
put(Pid, Key, Val) ->
|
new(Pid, Key) ->
|
||||||
gen_server:call(Pid, {put, Key, Val}, ?LONG_TIME).
|
gen_server:call(Pid, {new, Key}, ?LONG_TIME).
|
||||||
|
|
||||||
get(Pid, Key) ->
|
get(Pid, Key) ->
|
||||||
gen_server:call(Pid, {get, Key}, ?LONG_TIME).
|
gen_server:call(Pid, {get, Key}, ?LONG_TIME).
|
||||||
|
@ -69,7 +72,7 @@ get(Pid, Key) ->
|
||||||
init([PageSize, SequencerPid, Proj, CallbackMod]) ->
|
init([PageSize, SequencerPid, Proj, CallbackMod]) ->
|
||||||
LastLPN = find_last_lpn(SequencerPid),
|
LastLPN = find_last_lpn(SequencerPid),
|
||||||
{BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, 0),
|
{BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, 0),
|
||||||
I_State = play_log_pages(Pages, CallbackMod:fresh(), CallbackMod),
|
I_State = play_log_pages(Pages, CallbackMod:fresh(), CallbackMod, false),
|
||||||
{ok, #state{page_size=PageSize,
|
{ok, #state{page_size=PageSize,
|
||||||
seq=SequencerPid,
|
seq=SequencerPid,
|
||||||
proj=Proj,
|
proj=Proj,
|
||||||
|
@ -79,28 +82,36 @@ init([PageSize, SequencerPid, Proj, CallbackMod]) ->
|
||||||
back_ps=BackPs,
|
back_ps=BackPs,
|
||||||
i_state=I_State}}.
|
i_state=I_State}}.
|
||||||
|
|
||||||
handle_call({put, _Key, _Val}=Op, _From,
|
handle_call({new, Key}, From,
|
||||||
#state{proj=Proj0, cb_mod=CallbackMod,
|
#state{proj=Proj0, cb_mod=CallbackMod,
|
||||||
page_size=PageSize, back_ps=BackPs, i_state=I_State}=State) ->
|
page_size=PageSize, back_ps=BackPs, i_state=I_State}=State) ->
|
||||||
{Res, I_State2, Proj1, LPN, NewBackPs} =
|
Op = {new_oid, Key, From},
|
||||||
|
{_Res, I_State2, Proj1, LPN, NewBackPs} =
|
||||||
CallbackMod:do_dirty_op(Op, I_State, ?OID_STREAM_NUMBER,
|
CallbackMod:do_dirty_op(Op, I_State, ?OID_STREAM_NUMBER,
|
||||||
Proj0, PageSize, BackPs),
|
Proj0, PageSize, BackPs),
|
||||||
{reply, Res, State#state{i_state=I_State2,
|
%% Let's see how much trouble we can get outselves in here.
|
||||||
proj=Proj1,
|
%% If we're here, then we've written to the log without error.
|
||||||
last_write_lpn=LPN,
|
%% So then the cast to roll forward must see that log entry
|
||||||
back_ps=NewBackPs}};
|
%% (if it also operates without error). So, the side-effect of
|
||||||
|
%% the op ought to always send a reply to the client.
|
||||||
|
gen_server:cast(self(), {roll_forward}),
|
||||||
|
{noreply, State#state{i_state=I_State2,
|
||||||
|
proj=Proj1,
|
||||||
|
last_write_lpn=LPN,
|
||||||
|
back_ps=NewBackPs}};
|
||||||
handle_call({get, _Key}=Op, _From, #state{cb_mod=CallbackMod} = State) ->
|
handle_call({get, _Key}=Op, _From, #state{cb_mod=CallbackMod} = State) ->
|
||||||
{Pages, State2} = fetch_unread_pages(State),
|
State2 = #state{i_state=I_State} = roll_log_forward(State),
|
||||||
%% ?D(Pages),
|
|
||||||
#state{i_state=I_State} = State3 = play_log_pages(Pages, State2),
|
|
||||||
Reply = CallbackMod:do_pure_op(Op, I_State),
|
Reply = CallbackMod:do_pure_op(Op, I_State),
|
||||||
{reply, Reply, State3};
|
{reply, Reply, State2};
|
||||||
handle_call({stop}, _From, State) ->
|
handle_call({stop}, _From, State) ->
|
||||||
{stop, normal, ok, State};
|
{stop, normal, ok, State};
|
||||||
handle_call(_Request, _From, State) ->
|
handle_call(_Request, _From, State) ->
|
||||||
Reply = whaaaaaaaaaaaa,
|
Reply = whaaaaaaaaaaaa,
|
||||||
{reply, Reply, State}.
|
{reply, Reply, State}.
|
||||||
|
|
||||||
|
handle_cast({roll_forward}, State) ->
|
||||||
|
State2 = roll_log_forward(State),
|
||||||
|
{noreply, State2};
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
@ -142,9 +153,60 @@ fetch_unread_pages(Proj, LastLPN, StopAtLPN) ->
|
||||||
[], LPNs),
|
[], LPNs),
|
||||||
{BackPs, Pages}.
|
{BackPs, Pages}.
|
||||||
|
|
||||||
play_log_pages(Pages, #state{cb_mod=CallbackMod, i_state=I_State} = State) ->
|
play_log_pages(Pages, SideEffectsP,
|
||||||
I_State2 = play_log_pages(Pages, I_State, CallbackMod),
|
#state{cb_mod=CallbackMod, i_state=I_State} = State) ->
|
||||||
|
I_State2 = play_log_pages(Pages, I_State, CallbackMod, SideEffectsP),
|
||||||
State#state{i_state=I_State2}.
|
State#state{i_state=I_State2}.
|
||||||
|
|
||||||
play_log_pages(Pages, I_State, CallbackMod) ->
|
play_log_pages(Pages, I_State, CallbackMod, SideEffectsP) ->
|
||||||
CallbackMod:play_log_mutate_i_state(Pages, I_State).
|
CallbackMod:play_log_mutate_i_state(Pages, SideEffectsP, I_State).
|
||||||
|
|
||||||
|
roll_log_forward(State) ->
|
||||||
|
{Pages, State2} = fetch_unread_pages(State),
|
||||||
|
play_log_pages(Pages, true, State2).
|
||||||
|
|
||||||
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
|
||||||
|
-record(oid_map, {
|
||||||
|
next :: non_neg_integer(),
|
||||||
|
map :: dict()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(DICTMOD, dict).
|
||||||
|
|
||||||
|
fresh() ->
|
||||||
|
#oid_map{next=1,
|
||||||
|
map=?DICTMOD:new()}.
|
||||||
|
|
||||||
|
do_pure_op({get, Key}, #oid_map{map=Dict}) ->
|
||||||
|
?DICTMOD:find(Key, Dict).
|
||||||
|
|
||||||
|
do_dirty_op({new_oid, _Key, _From}=Op,
|
||||||
|
I_State, StreamNum, Proj0, PageSize, BackPs) ->
|
||||||
|
Page = term_to_binary(Op),
|
||||||
|
FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize),
|
||||||
|
{{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage),
|
||||||
|
NewBackPs = tango:add_back_pointer(BackPs, LPN),
|
||||||
|
{ok, I_State, Proj1, LPN, NewBackPs}.
|
||||||
|
|
||||||
|
play_log_mutate_i_state(Pages, SideEffectsP, I_State) ->
|
||||||
|
lists:foldl(fun({new_oid, Key, From}=_Op, #oid_map{map=Dict, next=Next}=O) ->
|
||||||
|
{Res, O2} =
|
||||||
|
case ?DICTMOD:find(Key, Dict) of
|
||||||
|
error ->
|
||||||
|
Dict2 = ?DICTMOD:store(Key, Next, Dict),
|
||||||
|
{{ok, Next},O#oid_map{map=Dict2,
|
||||||
|
next=Next + 1}};
|
||||||
|
{ok, _} ->
|
||||||
|
{already_exists, O}
|
||||||
|
end,
|
||||||
|
if SideEffectsP ->
|
||||||
|
gen_server:reply(From, Res);
|
||||||
|
true ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
O2
|
||||||
|
end,
|
||||||
|
I_State,
|
||||||
|
[binary_to_term(Page) || Page <- Pages]).
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,7 @@ tango_oid_smoke_test() ->
|
||||||
fun tango_oid_smoke_test_int/3).
|
fun tango_oid_smoke_test_int/3).
|
||||||
|
|
||||||
tango_oid_smoke_test_int(PageSize, Seq, Proj) ->
|
tango_oid_smoke_test_int(PageSize, Seq, Proj) ->
|
||||||
{ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj, tango_dt_map),
|
{ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj, tango_oid),
|
||||||
|
|
||||||
ok = tango_oid:stop(OID_Map),
|
ok = tango_oid:stop(OID_Map),
|
||||||
ok.
|
ok.
|
||||||
|
@ -46,18 +46,22 @@ tango_oid_one_test() ->
|
||||||
fun tango_oid_one_test_int/3).
|
fun tango_oid_one_test_int/3).
|
||||||
|
|
||||||
tango_oid_one_test_int(PageSize, Seq, Proj) ->
|
tango_oid_one_test_int(PageSize, Seq, Proj) ->
|
||||||
{ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj, tango_dt_map),
|
{ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj, tango_oid),
|
||||||
|
|
||||||
try
|
try
|
||||||
K1 = foo,
|
K1 = foo,
|
||||||
V1 = bar1,
|
K2 = bar,
|
||||||
V2 = bar2,
|
OID_Num1 = 1,
|
||||||
ok = tango_oid:put(OID_Map, K1, V1),
|
{ok, OID_Num1} = tango_oid:new(OID_Map, K1),
|
||||||
{ok, V1} = tango_oid:get(OID_Map, K1),
|
{ok, OID_Num1} = tango_oid:get(OID_Map, K1),
|
||||||
ok = tango_oid:put(OID_Map, K1, V2),
|
already_exists = tango_oid:new(OID_Map, K1),
|
||||||
{ok, V2} = tango_oid:get(OID_Map, K1),
|
%% The V2 put should *not* have clobbered the previous value
|
||||||
|
{ok, OID_Num1} = tango_oid:get(OID_Map, K1),
|
||||||
error = tango_oid:get(OID_Map, "does not exist"),
|
error = tango_oid:get(OID_Map, "does not exist"),
|
||||||
|
|
||||||
|
{ok, OID_Num2} = tango_oid:new(OID_Map, K2),
|
||||||
|
{ok, OID_Num2} = tango_oid:get(OID_Map, K2),
|
||||||
|
|
||||||
ok
|
ok
|
||||||
after
|
after
|
||||||
tango_oid:stop(OID_Map)
|
tango_oid:stop(OID_Map)
|
||||||
|
|
Loading…
Reference in a new issue