Add checkpoint support for tango_dt_queue
This commit is contained in:
parent
970eb263db
commit
4cf8ac7ed8
3 changed files with 67 additions and 33 deletions
|
@ -22,13 +22,15 @@
|
||||||
|
|
||||||
-behaviour(tango_dt).
|
-behaviour(tango_dt).
|
||||||
|
|
||||||
-export([start_link/4,
|
-export([start_link/4, stop/1,
|
||||||
is_empty/1, length/1, peek/1, to_list/1, member/2,
|
is_empty/1, length/1, peek/1, to_list/1, member/2,
|
||||||
in/2, out/1, reverse/1, filter/2]).
|
in/2, out/1, reverse/1, filter/2,
|
||||||
|
checkpoint/1]).
|
||||||
|
|
||||||
%% Tango datatype callbacks
|
%% Tango datatype callbacks
|
||||||
-export([fresh/0,
|
-export([fresh/0,
|
||||||
do_pure_op/2, do_dirty_op/7, play_log_mutate_i_state/3]).
|
do_pure_op/2, do_dirty_op/7, do_checkpoint/1,
|
||||||
|
play_log_mutate_i_state/3]).
|
||||||
|
|
||||||
-define(LONG_TIME, 30*1000).
|
-define(LONG_TIME, 30*1000).
|
||||||
|
|
||||||
|
@ -39,8 +41,8 @@ start_link(PageSize, SequencerPid, Proj, StreamNum) ->
|
||||||
[PageSize, SequencerPid, Proj, ?MODULE, StreamNum],
|
[PageSize, SequencerPid, Proj, ?MODULE, StreamNum],
|
||||||
[]).
|
[]).
|
||||||
|
|
||||||
%% set(Pid, Val) ->
|
stop(Pid) ->
|
||||||
%% gen_server:call(Pid, {cb_dirty_op, {o_set, Val}}, ?LONG_TIME).
|
tango_dt:stop(Pid).
|
||||||
|
|
||||||
is_empty(Pid) ->
|
is_empty(Pid) ->
|
||||||
gen_server:call(Pid, {cb_pure_op, {o_is_empty}}, ?LONG_TIME).
|
gen_server:call(Pid, {cb_pure_op, {o_is_empty}}, ?LONG_TIME).
|
||||||
|
@ -69,6 +71,9 @@ reverse(Pid) ->
|
||||||
filter(Pid, Fun) ->
|
filter(Pid, Fun) ->
|
||||||
gen_server:call(Pid, {cb_dirty_op, {o_filter, Fun}}, ?LONG_TIME).
|
gen_server:call(Pid, {cb_dirty_op, {o_filter, Fun}}, ?LONG_TIME).
|
||||||
|
|
||||||
|
checkpoint(Pid) ->
|
||||||
|
tango_dt:checkpoint(Pid).
|
||||||
|
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
|
||||||
fresh() ->
|
fresh() ->
|
||||||
|
@ -95,6 +100,9 @@ do_dirty_op(Op0, From,
|
||||||
NewBackPs = tango:add_back_pointer(BackPs, LPN),
|
NewBackPs = tango:add_back_pointer(BackPs, LPN),
|
||||||
{AsyncType, I_State, Proj1, LPN, NewBackPs}.
|
{AsyncType, I_State, Proj1, LPN, NewBackPs}.
|
||||||
|
|
||||||
|
do_checkpoint(Q=_I_State) ->
|
||||||
|
[{o_start_checkpoint}|[{o_in, X} || X <- queue:to_list(Q)]].
|
||||||
|
|
||||||
play_log_mutate_i_state(Pages, _SideEffectsP, I_State) ->
|
play_log_mutate_i_state(Pages, _SideEffectsP, I_State) ->
|
||||||
lists:foldl(fun({o_in, Val}=_Op, Q) ->
|
lists:foldl(fun({o_in, Val}=_Op, Q) ->
|
||||||
queue:in(Val, Q);
|
queue:in(Val, Q);
|
||||||
|
@ -109,14 +117,18 @@ play_log_mutate_i_state(Pages, _SideEffectsP, I_State) ->
|
||||||
({o_reverse}, Q) ->
|
({o_reverse}, Q) ->
|
||||||
queue:reverse(Q);
|
queue:reverse(Q);
|
||||||
({o_filter, Fun}, Q) ->
|
({o_filter, Fun}, Q) ->
|
||||||
queue:filter(Fun, Q)
|
queue:filter(Fun, Q);
|
||||||
|
({o_start_checkpoint}, _Q) ->
|
||||||
|
fresh()
|
||||||
end,
|
end,
|
||||||
I_State,
|
I_State,
|
||||||
[binary_to_term(Page) || Page <- Pages]).
|
lists:append([binary_to_term(Page) || Page <- Pages])).
|
||||||
|
|
||||||
transform_dirty_op({o_out}, From) ->
|
transform_dirty_op({o_out}, From) ->
|
||||||
%% This func will be executed on the server side prior to writing
|
%% This func will be executed on the server side prior to writing
|
||||||
%% to the log.
|
%% to the log.
|
||||||
{op_t_sync, {o_out, From, node(), self()}};
|
{op_t_sync, [{o_out, From, node(), self()}]};
|
||||||
|
transform_dirty_op(OpList, _From) when is_list(OpList) ->
|
||||||
|
{op_t_async, OpList};
|
||||||
transform_dirty_op(Op, _From) ->
|
transform_dirty_op(Op, _From) ->
|
||||||
{op_t_async, Op}.
|
{op_t_async, [Op]}.
|
||||||
|
|
|
@ -70,11 +70,13 @@ do_dirty_op(Op0, _From,
|
||||||
{op_t_async, I_State, Proj1, LPN, NewBackPs}.
|
{op_t_async, I_State, Proj1, LPN, NewBackPs}.
|
||||||
|
|
||||||
do_checkpoint(Register=_I_State) ->
|
do_checkpoint(Register=_I_State) ->
|
||||||
[{o_set, Register}].
|
[{o_start_checkpoint},{o_set, Register}].
|
||||||
|
|
||||||
play_log_mutate_i_state(Pages, _SideEffectsP, OldRegister=_I_State) ->
|
play_log_mutate_i_state(Pages, _SideEffectsP, OldRegister=_I_State) ->
|
||||||
lists:foldl(fun({o_set, Val}=_Op, _OldVal) ->
|
lists:foldl(fun({o_set, Val}=_Op, _OldVal) ->
|
||||||
Val
|
Val;
|
||||||
|
({o_start_checkpoint}, _OldVal) ->
|
||||||
|
fresh()
|
||||||
end,
|
end,
|
||||||
OldRegister,
|
OldRegister,
|
||||||
lists:append([binary_to_term(Page) || Page <- Pages])).
|
lists:append([binary_to_term(Page) || Page <- Pages])).
|
||||||
|
|
|
@ -243,37 +243,57 @@ tango_dt_queue_test() ->
|
||||||
4096, 5*1024, 1, fun tango_dt_queue_int/3).
|
4096, 5*1024, 1, fun tango_dt_queue_int/3).
|
||||||
|
|
||||||
tango_dt_queue_int(PageSize, Seq, Proj) ->
|
tango_dt_queue_int(PageSize, Seq, Proj) ->
|
||||||
|
MOD = tango_dt_queue,
|
||||||
{ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj),
|
{ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj),
|
||||||
|
|
||||||
{ok, Q1Num} = tango_oid:new(OID_Map, "queue1"),
|
{ok, Q1Num} = tango_oid:new(OID_Map, "queue1"),
|
||||||
{ok, Q1} = tango_dt_queue:start_link(PageSize, Seq, Proj, Q1Num),
|
{ok, Q1} = MOD:start_link(PageSize, Seq, Proj, Q1Num),
|
||||||
|
|
||||||
{ok, true} = tango_dt_queue:is_empty(Q1),
|
{ok, true} = MOD:is_empty(Q1),
|
||||||
{ok, 0} = tango_dt_queue:length(Q1),
|
{ok, 0} = MOD:length(Q1),
|
||||||
Num1 = 5,
|
Num1 = 15,
|
||||||
Seq1 = lists:seq(1, Num1),
|
Seq1 = lists:seq(1, Num1),
|
||||||
RevSeq1 = lists:reverse(Seq1),
|
RevSeq1 = lists:reverse(Seq1),
|
||||||
[ok = tango_dt_queue:in(Q1, X) || X <- Seq1],
|
[ok = MOD:in(Q1, X) || X <- Seq1],
|
||||||
{ok, Num1} = tango_dt_queue:length(Q1),
|
{ok, Num1} = MOD:length(Q1),
|
||||||
{ok, {value, 1}} = tango_dt_queue:peek(Q1),
|
{ok, {value, 1}} = MOD:peek(Q1),
|
||||||
{ok, Seq1} = tango_dt_queue:to_list(Q1),
|
{ok, Seq1} = MOD:to_list(Q1),
|
||||||
ok = tango_dt_queue:reverse(Q1),
|
ok = MOD:reverse(Q1),
|
||||||
{ok, RevSeq1} = tango_dt_queue:to_list(Q1),
|
{ok, RevSeq1} = MOD:to_list(Q1),
|
||||||
ok = tango_dt_queue:reverse(Q1),
|
ok = MOD:reverse(Q1),
|
||||||
|
|
||||||
[{ok, {value, X}} = tango_dt_queue:out(Q1) || X <- lists:seq(1, Num1)],
|
[{ok, {value, X}} = MOD:out(Q1) || X <- lists:seq(1, Num1)],
|
||||||
{ok, empty} = tango_dt_queue:out(Q1),
|
{ok, empty} = MOD:out(Q1),
|
||||||
{ok, []} = tango_dt_queue:to_list(Q1),
|
{ok, []} = MOD:to_list(Q1),
|
||||||
|
|
||||||
[ok = tango_dt_queue:in(Q1, X) || X <- Seq1],
|
[ok = MOD:in(Q1, X) || X <- Seq1],
|
||||||
{ok, false} = tango_dt_queue:member(Q1, does_not_exist),
|
{ok, false} = MOD:member(Q1, does_not_exist),
|
||||||
{ok, true} = tango_dt_queue:member(Q1, Num1),
|
{ok, true} = MOD:member(Q1, Num1),
|
||||||
ok = tango_dt_queue:filter(Q1, fun(X) when X == Num1 -> false;
|
ok = MOD:filter(Q1, fun(X) when X == Num1 -> false;
|
||||||
(_) -> true
|
(_) -> true
|
||||||
end),
|
end),
|
||||||
{ok, false} = tango_dt_queue:member(Q1, Num1),
|
Num1Minus1 = Num1 - 1,
|
||||||
{ok, true} = tango_dt_queue:member(Q1, Num1 - 1),
|
C1 = fun(Q, Expected) -> {ok, false} = MOD:member(Q, Num1),
|
||||||
|
{ok, true} = MOD:member(Q, Num1 - 1),
|
||||||
|
{ok, Expected} = MOD:length(Q), ok end,
|
||||||
|
ok = C1(Q1, Num1Minus1),
|
||||||
|
|
||||||
|
{ok, Q2} = MOD:start_link(PageSize, Seq, Proj, Q1Num),
|
||||||
|
ok = C1(Q2, Num1Minus1),
|
||||||
|
ok = MOD:in(Q2, 88),
|
||||||
|
ok = C1(Q2, Num1),
|
||||||
|
ok = C1(Q1, Num1),
|
||||||
|
|
||||||
|
[ok = MOD:checkpoint(Q1) || _ <- lists:seq(1, 4)],
|
||||||
|
[ok = C1(X, Num1) || X <- [Q1, Q2]],
|
||||||
|
{ok, Q3} = MOD:start_link(PageSize, Seq, Proj, Q1Num),
|
||||||
|
[ok = C1(X, Num1) || X <- [Q1, Q2, Q3]],
|
||||||
|
{ok, Q4} = MOD:start_link(PageSize, Seq, Proj, Q1Num),
|
||||||
|
ok = MOD:in(Q4, 89),
|
||||||
|
Num1Plus1 = Num1 + 1,
|
||||||
|
[ok = C1(X, Num1Plus1) || X <- [Q1, Q2, Q3, Q4]],
|
||||||
|
|
||||||
|
[ok = MOD:stop(X) || X <- [Q1, Q2, Q3, Q4]],
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-endif. % not PULSE
|
-endif. % not PULSE
|
||||||
|
|
Loading…
Reference in a new issue