Add checkpoint support for tango_dt_map
This commit is contained in:
parent
4cf8ac7ed8
commit
940012cef1
2 changed files with 35 additions and 10 deletions
|
@ -22,12 +22,14 @@
|
||||||
|
|
||||||
-behaviour(tango_dt).
|
-behaviour(tango_dt).
|
||||||
|
|
||||||
-export([start_link/4,
|
-export([start_link/4, stop/1,
|
||||||
set/3, get/2]).
|
set/3, get/2,
|
||||||
|
checkpoint/1]).
|
||||||
|
|
||||||
%% Tango datatype callbacks
|
%% Tango datatype callbacks
|
||||||
-export([fresh/0,
|
-export([fresh/0,
|
||||||
do_pure_op/2, do_dirty_op/7, play_log_mutate_i_state/3]).
|
do_pure_op/2, do_dirty_op/7, do_checkpoint/1,
|
||||||
|
play_log_mutate_i_state/3]).
|
||||||
|
|
||||||
-define(DICTMOD, dict).
|
-define(DICTMOD, dict).
|
||||||
|
|
||||||
|
@ -38,12 +40,19 @@ start_link(PageSize, SequencerPid, Proj, StreamNum) ->
|
||||||
[PageSize, SequencerPid, Proj, ?MODULE, StreamNum],
|
[PageSize, SequencerPid, Proj, ?MODULE, StreamNum],
|
||||||
[]).
|
[]).
|
||||||
|
|
||||||
|
stop(Pid) ->
|
||||||
|
tango_dt:stop(Pid).
|
||||||
|
|
||||||
set(Pid, Key, Val) ->
|
set(Pid, Key, Val) ->
|
||||||
gen_server:call(Pid, {cb_dirty_op, {o_set, Key, Val}}, ?LONG_TIME).
|
gen_server:call(Pid, {cb_dirty_op, {o_set, Key, Val}}, ?LONG_TIME).
|
||||||
|
|
||||||
get(Pid, Key) ->
|
get(Pid, Key) ->
|
||||||
gen_server:call(Pid, {cb_pure_op, {o_get, Key}}, ?LONG_TIME).
|
gen_server:call(Pid, {cb_pure_op, {o_get, Key}}, ?LONG_TIME).
|
||||||
|
|
||||||
|
checkpoint(Pid) ->
|
||||||
|
tango_dt:checkpoint(Pid).
|
||||||
|
|
||||||
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
|
||||||
fresh() ->
|
fresh() ->
|
||||||
?DICTMOD:new().
|
?DICTMOD:new().
|
||||||
|
@ -51,8 +60,11 @@ fresh() ->
|
||||||
do_pure_op({o_get, Key}, Dict) ->
|
do_pure_op({o_get, Key}, Dict) ->
|
||||||
?DICTMOD:find(Key, Dict).
|
?DICTMOD:find(Key, Dict).
|
||||||
|
|
||||||
do_dirty_op({o_set, _Key, _Val}=Op, _From,
|
do_dirty_op(Op0, _From,
|
||||||
I_State, StreamNum, Proj0, PageSize, BackPs) ->
|
I_State, StreamNum, Proj0, PageSize, BackPs) ->
|
||||||
|
Op = if is_list(Op0) -> Op0;
|
||||||
|
true -> [Op0] % always make a list
|
||||||
|
end,
|
||||||
Page = term_to_binary(Op),
|
Page = term_to_binary(Op),
|
||||||
FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize),
|
FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize),
|
||||||
{{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage,
|
{{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage,
|
||||||
|
@ -60,10 +72,15 @@ do_dirty_op({o_set, _Key, _Val}=Op, _From,
|
||||||
NewBackPs = tango:add_back_pointer(BackPs, LPN),
|
NewBackPs = tango:add_back_pointer(BackPs, LPN),
|
||||||
{op_t_async, I_State, Proj1, LPN, NewBackPs}.
|
{op_t_async, I_State, Proj1, LPN, NewBackPs}.
|
||||||
|
|
||||||
|
do_checkpoint(Dict=_I_State) ->
|
||||||
|
[{o_start_checkpoint}|[{o_set, X, Y} || {X, Y} <- ?DICTMOD:to_list(Dict)]].
|
||||||
|
|
||||||
play_log_mutate_i_state(Pages, _SideEffectsP, I_State) ->
|
play_log_mutate_i_state(Pages, _SideEffectsP, I_State) ->
|
||||||
lists:foldl(fun({o_set, Key, Val}=_Op, Dict) ->
|
lists:foldl(fun({o_set, Key, Val}=_Op, Dict) ->
|
||||||
?DICTMOD:store(Key, Val, Dict)
|
?DICTMOD:store(Key, Val, Dict);
|
||||||
|
({o_start_checkpoint}, _Dict) ->
|
||||||
|
fresh()
|
||||||
end,
|
end,
|
||||||
I_State,
|
I_State,
|
||||||
[binary_to_term(Page) || Page <- Pages]).
|
lists:append([binary_to_term(Page) || Page <- Pages])).
|
||||||
|
|
||||||
|
|
|
@ -222,8 +222,10 @@ tango_dt_map_int(PageSize, Seq, Proj) ->
|
||||||
[tango_dt_map:set(Reg, Key, Val) || Reg <- [Reg1, Reg2],
|
[tango_dt_map:set(Reg, Key, Val) || Reg <- [Reg1, Reg2],
|
||||||
Key <- Keys, Val <- Vals],
|
Key <- Keys, Val <- Vals],
|
||||||
LastVal = lists:last(Vals),
|
LastVal = lists:last(Vals),
|
||||||
[{ok, LastVal} = tango_dt_map:get(Reg1, Key) || Key <- Keys],
|
C1 = fun(R, LV) -> [{ok, LV} = tango_dt_map:get(R, Key) || Key <- Keys],
|
||||||
[{ok, LastVal} = tango_dt_map:get(Reg2, Key) || Key <- Keys],
|
ok end,
|
||||||
|
ok = C1(Reg1, LastVal),
|
||||||
|
ok = C1(Reg2, LastVal),
|
||||||
|
|
||||||
%% If we instantiate a new instance of an existing map, then
|
%% If we instantiate a new instance of an existing map, then
|
||||||
%% a single get should show the most recent modification.
|
%% a single get should show the most recent modification.
|
||||||
|
@ -233,8 +235,14 @@ tango_dt_map_int(PageSize, Seq, Proj) ->
|
||||||
%% instance should also see the update.
|
%% instance should also see the update.
|
||||||
NewVal = {"Heh", "a new value"},
|
NewVal = {"Heh", "a new value"},
|
||||||
[ok = tango_dt_map:set(Reg2, Key, NewVal) || Key <- Keys],
|
[ok = tango_dt_map:set(Reg2, Key, NewVal) || Key <- Keys],
|
||||||
[{ok, NewVal} = tango_dt_map:get(Reg2b, Key) || Key <- Keys],
|
[ok = C1(R, NewVal) || R <- [Reg2, Reg2b]],
|
||||||
[{ok, NewVal} = tango_dt_map:get(Reg2, Key) || Key <- Keys], % sanity
|
[ok = C1(R, LastVal) || R <- [Reg1]],
|
||||||
|
|
||||||
|
[ok = tango_dt_map:checkpoint(R) || R <- [Reg1, Reg2, Reg2b]],
|
||||||
|
NewVal2 = "after the checkpoint....",
|
||||||
|
[ok = tango_dt_map:set(Reg2, Key, NewVal2) || Key <- Keys],
|
||||||
|
[ok = C1(R, NewVal2) || R <- [Reg2, Reg2b]],
|
||||||
|
[ok = C1(R, LastVal) || R <- [Reg1]],
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue