First draft of tango_dt_register
This commit is contained in:
parent
6067e26201
commit
18b38c249e
3 changed files with 44 additions and 16 deletions
|
@ -23,7 +23,7 @@
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/4, stop/1,
|
-export([start_link/3, stop/1,
|
||||||
new/2, get/2]).
|
new/2, get/2]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
|
@ -47,16 +47,15 @@
|
||||||
page_size :: non_neg_integer(), % CORFU page size
|
page_size :: non_neg_integer(), % CORFU page size
|
||||||
seq :: pid(), % sequencer pid
|
seq :: pid(), % sequencer pid
|
||||||
proj :: term(), % projection
|
proj :: term(), % projection
|
||||||
cb_mod :: atom(), % callback module
|
|
||||||
last_read_lpn :: lpn(), %
|
last_read_lpn :: lpn(), %
|
||||||
last_write_lpn :: lpn(),
|
last_write_lpn :: lpn(),
|
||||||
back_ps :: [lpn()], % back pointers (up to 4)
|
back_ps :: [lpn()], % back pointers (up to 4)
|
||||||
i_state :: term() % internal state thingie
|
i_state :: term() % internal state thingie
|
||||||
}).
|
}).
|
||||||
|
|
||||||
start_link(PageSize, SequencerPid, Proj, CallbackMod) ->
|
start_link(PageSize, SequencerPid, Proj) ->
|
||||||
gen_server:start_link(?MODULE,
|
gen_server:start_link(?MODULE,
|
||||||
[PageSize, SequencerPid, Proj, CallbackMod], []).
|
[PageSize, SequencerPid, Proj], []).
|
||||||
|
|
||||||
stop(Pid) ->
|
stop(Pid) ->
|
||||||
gen_server:call(Pid, {stop}, ?LONG_TIME).
|
gen_server:call(Pid, {stop}, ?LONG_TIME).
|
||||||
|
@ -69,25 +68,24 @@ get(Pid, Key) ->
|
||||||
|
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
|
||||||
init([PageSize, SequencerPid, Proj, CallbackMod]) ->
|
init([PageSize, SequencerPid, Proj]) ->
|
||||||
LastLPN = find_last_lpn(SequencerPid),
|
LastLPN = find_last_lpn(SequencerPid),
|
||||||
{BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, 0),
|
{BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, 0),
|
||||||
I_State = play_log_pages(Pages, CallbackMod:fresh(), CallbackMod, false),
|
I_State = play_log_pages(Pages, fresh(), ?MODULE, false),
|
||||||
{ok, #state{page_size=PageSize,
|
{ok, #state{page_size=PageSize,
|
||||||
seq=SequencerPid,
|
seq=SequencerPid,
|
||||||
proj=Proj,
|
proj=Proj,
|
||||||
cb_mod=CallbackMod,
|
|
||||||
last_read_lpn=LastLPN,
|
last_read_lpn=LastLPN,
|
||||||
last_write_lpn=LastLPN,
|
last_write_lpn=LastLPN,
|
||||||
back_ps=BackPs,
|
back_ps=BackPs,
|
||||||
i_state=I_State}}.
|
i_state=I_State}}.
|
||||||
|
|
||||||
handle_call({new, Key}, From,
|
handle_call({new, Key}, From,
|
||||||
#state{proj=Proj0, cb_mod=CallbackMod,
|
#state{proj=Proj0,
|
||||||
page_size=PageSize, back_ps=BackPs, i_state=I_State}=State) ->
|
page_size=PageSize, back_ps=BackPs, i_state=I_State}=State) ->
|
||||||
Op = {new_oid, Key, From},
|
Op = {new_oid, Key, From},
|
||||||
{_Res, I_State2, Proj1, LPN, NewBackPs} =
|
{_Res, I_State2, Proj1, LPN, NewBackPs} =
|
||||||
CallbackMod:do_dirty_op(Op, I_State, ?OID_STREAM_NUMBER,
|
do_dirty_op(Op, I_State, ?OID_STREAM_NUMBER,
|
||||||
Proj0, PageSize, BackPs),
|
Proj0, PageSize, BackPs),
|
||||||
%% Let's see how much trouble we can get outselves in here.
|
%% Let's see how much trouble we can get outselves in here.
|
||||||
%% If we're here, then we've written to the log without error.
|
%% If we're here, then we've written to the log without error.
|
||||||
|
@ -99,9 +97,9 @@ handle_call({new, Key}, From,
|
||||||
proj=Proj1,
|
proj=Proj1,
|
||||||
last_write_lpn=LPN,
|
last_write_lpn=LPN,
|
||||||
back_ps=NewBackPs}};
|
back_ps=NewBackPs}};
|
||||||
handle_call({get, _Key}=Op, _From, #state{cb_mod=CallbackMod} = State) ->
|
handle_call({get, _Key}=Op, _From, State) ->
|
||||||
State2 = #state{i_state=I_State} = roll_log_forward(State),
|
State2 = #state{i_state=I_State} = roll_log_forward(State),
|
||||||
Reply = CallbackMod:do_pure_op(Op, I_State),
|
Reply = do_pure_op(Op, I_State),
|
||||||
{reply, Reply, State2};
|
{reply, Reply, State2};
|
||||||
handle_call({stop}, _From, State) ->
|
handle_call({stop}, _From, State) ->
|
||||||
{stop, normal, ok, State};
|
{stop, normal, ok, State};
|
||||||
|
@ -154,8 +152,8 @@ fetch_unread_pages(Proj, LastLPN, StopAtLPN) ->
|
||||||
{BackPs, Pages}.
|
{BackPs, Pages}.
|
||||||
|
|
||||||
play_log_pages(Pages, SideEffectsP,
|
play_log_pages(Pages, SideEffectsP,
|
||||||
#state{cb_mod=CallbackMod, i_state=I_State} = State) ->
|
#state{i_state=I_State} = State) ->
|
||||||
I_State2 = play_log_pages(Pages, I_State, CallbackMod, SideEffectsP),
|
I_State2 = play_log_pages(Pages, I_State, ?MODULE, SideEffectsP),
|
||||||
State#state{i_state=I_State2}.
|
State#state{i_state=I_State2}.
|
||||||
|
|
||||||
play_log_pages(Pages, I_State, CallbackMod, SideEffectsP) ->
|
play_log_pages(Pages, I_State, CallbackMod, SideEffectsP) ->
|
||||||
|
|
|
@ -36,7 +36,7 @@ tango_oid_smoke_test() ->
|
||||||
fun tango_oid_smoke_test_int/3).
|
fun tango_oid_smoke_test_int/3).
|
||||||
|
|
||||||
tango_oid_smoke_test_int(PageSize, Seq, Proj) ->
|
tango_oid_smoke_test_int(PageSize, Seq, Proj) ->
|
||||||
{ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj, tango_oid),
|
{ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj),
|
||||||
|
|
||||||
ok = tango_oid:stop(OID_Map),
|
ok = tango_oid:stop(OID_Map),
|
||||||
ok.
|
ok.
|
||||||
|
@ -46,7 +46,7 @@ tango_oid_one_test() ->
|
||||||
fun tango_oid_one_test_int/3).
|
fun tango_oid_one_test_int/3).
|
||||||
|
|
||||||
tango_oid_one_test_int(PageSize, Seq, Proj) ->
|
tango_oid_one_test_int(PageSize, Seq, Proj) ->
|
||||||
{ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj, tango_oid),
|
{ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj),
|
||||||
|
|
||||||
try
|
try
|
||||||
K1 = foo,
|
K1 = foo,
|
||||||
|
|
|
@ -157,5 +157,35 @@ scan_backward_test_int(PageSize, _Seq, P1) ->
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
tango_dt_register_test() ->
|
||||||
|
ok = run_test("/tmp", "tango_dt_register",
|
||||||
|
4096, 5*1024, 1, fun tango_dt_register_int/3).
|
||||||
|
|
||||||
|
tango_dt_register_int(PageSize, Seq, Proj) ->
|
||||||
|
{ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj),
|
||||||
|
|
||||||
|
{ok, Reg1Num} = tango_oid:new(OID_Map, "register1"),
|
||||||
|
{ok, Reg1} = tango_dt_register:start_link(PageSize, Seq, Proj,
|
||||||
|
tango_dt_register, Reg1Num),
|
||||||
|
{ok, Reg2Num} = tango_oid:new(OID_Map, "register2"),
|
||||||
|
{ok, Reg2} = tango_dt_register:start_link(PageSize, Seq, Proj,
|
||||||
|
tango_dt_register, Reg2Num),
|
||||||
|
|
||||||
|
NumVals = 8,
|
||||||
|
Vals = [lists:flatten(io_lib:format("version ~w", [X])) ||
|
||||||
|
X <- lists:seq(1, NumVals)],
|
||||||
|
[tango_dt_register:set(Reg, Val) || Reg <- [Reg1, Reg2], Val <- Vals],
|
||||||
|
LastVal = lists:last(Vals),
|
||||||
|
{ok, LastVal} = tango_dt_register:get(Reg1),
|
||||||
|
{ok, LastVal} = tango_dt_register:get(Reg2),
|
||||||
|
|
||||||
|
%% If we instantiate a new instance of an existing register, then
|
||||||
|
%% a single get should show the most recent modification.
|
||||||
|
{ok, Reg2b} = tango_dt_register:start_link(PageSize, Seq, Proj,
|
||||||
|
tango_dt_register, Reg2Num),
|
||||||
|
{ok, LastVal} = tango_dt_register:get(Reg2b),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
||||||
-endif. % not PULSE
|
-endif. % not PULSE
|
||||||
-endif. % TEST
|
-endif. % TEST
|
||||||
|
|
Loading…
Reference in a new issue