Add checkpoint support for tango_dt_register

This commit is contained in:
Scott Lystig Fritchie 2014-09-19 16:41:31 +09:00
parent 7b9c94553c
commit 004a18d948
2 changed files with 31 additions and 9 deletions

View file

@ -22,12 +22,14 @@
-behaviour(tango_dt). -behaviour(tango_dt).
-export([start_link/4, -export([start_link/4, stop/1,
set/2, get/1]). set/2, get/1,
checkpoint/1]).
%% Tango datatype callbacks %% Tango datatype callbacks
-export([fresh/0, -export([fresh/0,
do_pure_op/2, do_dirty_op/7, play_log_mutate_i_state/3]). do_pure_op/2, do_dirty_op/7, do_checkpoint/1,
play_log_mutate_i_state/3]).
-define(LONG_TIME, 30*1000). -define(LONG_TIME, 30*1000).
@ -36,12 +38,18 @@ start_link(PageSize, SequencerPid, Proj, StreamNum) ->
[PageSize, SequencerPid, Proj, ?MODULE, StreamNum], [PageSize, SequencerPid, Proj, ?MODULE, StreamNum],
[]). []).
stop(Pid) ->
tango_dt:stop(Pid).
set(Pid, Val) -> set(Pid, Val) ->
gen_server:call(Pid, {cb_dirty_op, {o_set, Val}}, ?LONG_TIME). gen_server:call(Pid, {cb_dirty_op, {o_set, Val}}, ?LONG_TIME).
get(Pid) -> get(Pid) ->
gen_server:call(Pid, {cb_pure_op, {o_get}}, ?LONG_TIME). gen_server:call(Pid, {cb_pure_op, {o_get}}, ?LONG_TIME).
checkpoint(Pid) ->
tango_dt:checkpoint(Pid).
fresh() -> fresh() ->
undefined. undefined.
@ -49,8 +57,11 @@ fresh() ->
do_pure_op({o_get}, Register) -> do_pure_op({o_get}, Register) ->
{ok, Register}. {ok, Register}.
do_dirty_op({o_set, _Val}=Op, _From, do_dirty_op(Op0, _From,
I_State, StreamNum, Proj0, PageSize, BackPs) -> I_State, StreamNum, Proj0, PageSize, BackPs) ->
Op = if is_list(Op0) -> Op0;
true -> [Op0] % always make a list
end,
Page = term_to_binary(Op), Page = term_to_binary(Op),
FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize), FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize),
{{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage, {{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage,
@ -58,10 +69,13 @@ do_dirty_op({o_set, _Val}=Op, _From,
NewBackPs = tango:add_back_pointer(BackPs, LPN), NewBackPs = tango:add_back_pointer(BackPs, LPN),
{op_t_async, I_State, Proj1, LPN, NewBackPs}. {op_t_async, I_State, Proj1, LPN, NewBackPs}.
play_log_mutate_i_state(Pages, _SideEffectsP, I_State) -> do_checkpoint(Register=_I_State) ->
[{o_set, Register}].
play_log_mutate_i_state(Pages, _SideEffectsP, OldRegister=_I_State) ->
lists:foldl(fun({o_set, Val}=_Op, _OldVal) -> lists:foldl(fun({o_set, Val}=_Op, _OldVal) ->
Val Val
end, end,
I_State, OldRegister,
[binary_to_term(Page) || Page <- Pages]). lists:append([binary_to_term(Page) || Page <- Pages])).

View file

@ -190,9 +190,17 @@ tango_dt_register_int(PageSize, Seq, Proj) ->
%% instance should also see the update. %% instance should also see the update.
NewVal = {"Heh", "a new value"}, NewVal = {"Heh", "a new value"},
ok = tango_dt_register:set(Reg2, NewVal), ok = tango_dt_register:set(Reg2, NewVal),
{ok, NewVal} = tango_dt_register:get(Reg2b), C1 = fun() -> {ok, NewVal} = tango_dt_register:get(Reg2), % sanity check
{ok, NewVal} = tango_dt_register:get(Reg2), % sanity check {ok, NewVal} = tango_dt_register:get(Reg2b), ok end,
ok = C1(),
ok = tango_dt_register:checkpoint(Reg2),
ok = C1(),
{ok, Reg2c} = tango_dt_register:start_link(PageSize, Seq, Proj,
Reg2Num),
{ok, NewVal} = tango_dt_register:get(Reg2c),
[ok = tango_dt_register:stop(X) || X <- [Reg1, Reg2, Reg2b, Reg2c]],
ok. ok.
tango_dt_map_test() -> tango_dt_map_test() ->