From 4cf8ac7ed8d1f3f402d086be55ee51c7a5003388 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Fri, 19 Sep 2014 18:12:42 +0900 Subject: [PATCH] Add checkpoint support for tango_dt_queue --- .../tango-prototype/src/tango_dt_queue.erl | 30 ++++++--- .../tango-prototype/src/tango_dt_register.erl | 6 +- prototype/tango-prototype/test/tango_test.erl | 64 ++++++++++++------- 3 files changed, 67 insertions(+), 33 deletions(-) diff --git a/prototype/tango-prototype/src/tango_dt_queue.erl b/prototype/tango-prototype/src/tango_dt_queue.erl index b9555a9..27f14ea 100644 --- a/prototype/tango-prototype/src/tango_dt_queue.erl +++ b/prototype/tango-prototype/src/tango_dt_queue.erl @@ -22,13 +22,15 @@ -behaviour(tango_dt). --export([start_link/4, +-export([start_link/4, stop/1, is_empty/1, length/1, peek/1, to_list/1, member/2, - in/2, out/1, reverse/1, filter/2]). + in/2, out/1, reverse/1, filter/2, + checkpoint/1]). %% Tango datatype callbacks -export([fresh/0, - do_pure_op/2, do_dirty_op/7, play_log_mutate_i_state/3]). + do_pure_op/2, do_dirty_op/7, do_checkpoint/1, + play_log_mutate_i_state/3]). -define(LONG_TIME, 30*1000). @@ -39,8 +41,8 @@ start_link(PageSize, SequencerPid, Proj, StreamNum) -> [PageSize, SequencerPid, Proj, ?MODULE, StreamNum], []). -%% set(Pid, Val) -> -%% gen_server:call(Pid, {cb_dirty_op, {o_set, Val}}, ?LONG_TIME). +stop(Pid) -> + tango_dt:stop(Pid). is_empty(Pid) -> gen_server:call(Pid, {cb_pure_op, {o_is_empty}}, ?LONG_TIME). @@ -69,6 +71,9 @@ reverse(Pid) -> filter(Pid, Fun) -> gen_server:call(Pid, {cb_dirty_op, {o_filter, Fun}}, ?LONG_TIME). +checkpoint(Pid) -> + tango_dt:checkpoint(Pid). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% fresh() -> @@ -95,6 +100,9 @@ do_dirty_op(Op0, From, NewBackPs = tango:add_back_pointer(BackPs, LPN), {AsyncType, I_State, Proj1, LPN, NewBackPs}. +do_checkpoint(Q=_I_State) -> + [{o_start_checkpoint}|[{o_in, X} || X <- queue:to_list(Q)]]. + play_log_mutate_i_state(Pages, _SideEffectsP, I_State) -> lists:foldl(fun({o_in, Val}=_Op, Q) -> queue:in(Val, Q); @@ -109,14 +117,18 @@ play_log_mutate_i_state(Pages, _SideEffectsP, I_State) -> ({o_reverse}, Q) -> queue:reverse(Q); ({o_filter, Fun}, Q) -> - queue:filter(Fun, Q) + queue:filter(Fun, Q); + ({o_start_checkpoint}, _Q) -> + fresh() end, I_State, - [binary_to_term(Page) || Page <- Pages]). + lists:append([binary_to_term(Page) || Page <- Pages])). transform_dirty_op({o_out}, From) -> %% This func will be executed on the server side prior to writing %% to the log. - {op_t_sync, {o_out, From, node(), self()}}; + {op_t_sync, [{o_out, From, node(), self()}]}; +transform_dirty_op(OpList, _From) when is_list(OpList) -> + {op_t_async, OpList}; transform_dirty_op(Op, _From) -> - {op_t_async, Op}. + {op_t_async, [Op]}. diff --git a/prototype/tango-prototype/src/tango_dt_register.erl b/prototype/tango-prototype/src/tango_dt_register.erl index 9b65eb2..3b09a8d 100644 --- a/prototype/tango-prototype/src/tango_dt_register.erl +++ b/prototype/tango-prototype/src/tango_dt_register.erl @@ -70,11 +70,13 @@ do_dirty_op(Op0, _From, {op_t_async, I_State, Proj1, LPN, NewBackPs}. do_checkpoint(Register=_I_State) -> - [{o_set, Register}]. + [{o_start_checkpoint},{o_set, Register}]. play_log_mutate_i_state(Pages, _SideEffectsP, OldRegister=_I_State) -> lists:foldl(fun({o_set, Val}=_Op, _OldVal) -> - Val + Val; + ({o_start_checkpoint}, _OldVal) -> + fresh() end, OldRegister, lists:append([binary_to_term(Page) || Page <- Pages])). diff --git a/prototype/tango-prototype/test/tango_test.erl b/prototype/tango-prototype/test/tango_test.erl index e7f3431..27ac1b7 100644 --- a/prototype/tango-prototype/test/tango_test.erl +++ b/prototype/tango-prototype/test/tango_test.erl @@ -243,37 +243,57 @@ tango_dt_queue_test() -> 4096, 5*1024, 1, fun tango_dt_queue_int/3). tango_dt_queue_int(PageSize, Seq, Proj) -> + MOD = tango_dt_queue, {ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj), {ok, Q1Num} = tango_oid:new(OID_Map, "queue1"), - {ok, Q1} = tango_dt_queue:start_link(PageSize, Seq, Proj, Q1Num), + {ok, Q1} = MOD:start_link(PageSize, Seq, Proj, Q1Num), - {ok, true} = tango_dt_queue:is_empty(Q1), - {ok, 0} = tango_dt_queue:length(Q1), - Num1 = 5, + {ok, true} = MOD:is_empty(Q1), + {ok, 0} = MOD:length(Q1), + Num1 = 15, Seq1 = lists:seq(1, Num1), RevSeq1 = lists:reverse(Seq1), - [ok = tango_dt_queue:in(Q1, X) || X <- Seq1], - {ok, Num1} = tango_dt_queue:length(Q1), - {ok, {value, 1}} = tango_dt_queue:peek(Q1), - {ok, Seq1} = tango_dt_queue:to_list(Q1), - ok = tango_dt_queue:reverse(Q1), - {ok, RevSeq1} = tango_dt_queue:to_list(Q1), - ok = tango_dt_queue:reverse(Q1), + [ok = MOD:in(Q1, X) || X <- Seq1], + {ok, Num1} = MOD:length(Q1), + {ok, {value, 1}} = MOD:peek(Q1), + {ok, Seq1} = MOD:to_list(Q1), + ok = MOD:reverse(Q1), + {ok, RevSeq1} = MOD:to_list(Q1), + ok = MOD:reverse(Q1), - [{ok, {value, X}} = tango_dt_queue:out(Q1) || X <- lists:seq(1, Num1)], - {ok, empty} = tango_dt_queue:out(Q1), - {ok, []} = tango_dt_queue:to_list(Q1), + [{ok, {value, X}} = MOD:out(Q1) || X <- lists:seq(1, Num1)], + {ok, empty} = MOD:out(Q1), + {ok, []} = MOD:to_list(Q1), - [ok = tango_dt_queue:in(Q1, X) || X <- Seq1], - {ok, false} = tango_dt_queue:member(Q1, does_not_exist), - {ok, true} = tango_dt_queue:member(Q1, Num1), - ok = tango_dt_queue:filter(Q1, fun(X) when X == Num1 -> false; - (_) -> true - end), - {ok, false} = tango_dt_queue:member(Q1, Num1), - {ok, true} = tango_dt_queue:member(Q1, Num1 - 1), + [ok = MOD:in(Q1, X) || X <- Seq1], + {ok, false} = MOD:member(Q1, does_not_exist), + {ok, true} = MOD:member(Q1, Num1), + ok = MOD:filter(Q1, fun(X) when X == Num1 -> false; + (_) -> true + end), + Num1Minus1 = Num1 - 1, + C1 = fun(Q, Expected) -> {ok, false} = MOD:member(Q, Num1), + {ok, true} = MOD:member(Q, Num1 - 1), + {ok, Expected} = MOD:length(Q), ok end, + ok = C1(Q1, Num1Minus1), + {ok, Q2} = MOD:start_link(PageSize, Seq, Proj, Q1Num), + ok = C1(Q2, Num1Minus1), + ok = MOD:in(Q2, 88), + ok = C1(Q2, Num1), + ok = C1(Q1, Num1), + + [ok = MOD:checkpoint(Q1) || _ <- lists:seq(1, 4)], + [ok = C1(X, Num1) || X <- [Q1, Q2]], + {ok, Q3} = MOD:start_link(PageSize, Seq, Proj, Q1Num), + [ok = C1(X, Num1) || X <- [Q1, Q2, Q3]], + {ok, Q4} = MOD:start_link(PageSize, Seq, Proj, Q1Num), + ok = MOD:in(Q4, 89), + Num1Plus1 = Num1 + 1, + [ok = C1(X, Num1Plus1) || X <- [Q1, Q2, Q3, Q4]], + + [ok = MOD:stop(X) || X <- [Q1, Q2, Q3, Q4]], ok. -endif. % not PULSE