All tests pass, but checkpointing does not truncate history
This commit is contained in:
parent
fed2f43783
commit
7bf98fa648
4 changed files with 18 additions and 20 deletions
|
@ -31,7 +31,8 @@
|
||||||
scan_backward/5,
|
scan_backward/5,
|
||||||
pad_bin/2,
|
pad_bin/2,
|
||||||
append_page/3,
|
append_page/3,
|
||||||
back_ps2last_lpn/1]).
|
back_ps2last_lpn/1,
|
||||||
|
append_lpns/2]).
|
||||||
|
|
||||||
-define(MAGIC_NUMBER_V1, 16#88990011).
|
-define(MAGIC_NUMBER_V1, 16#88990011).
|
||||||
|
|
||||||
|
@ -210,3 +211,9 @@ back_ps2last_lpn([]) ->
|
||||||
0;
|
0;
|
||||||
back_ps2last_lpn([H|_]) ->
|
back_ps2last_lpn([H|_]) ->
|
||||||
H.
|
H.
|
||||||
|
|
||||||
|
append_lpns([], BPs) ->
|
||||||
|
BPs;
|
||||||
|
append_lpns(LPNs, BPs) ->
|
||||||
|
lists:reverse(LPNs) ++ BPs.
|
||||||
|
|
||||||
|
|
|
@ -75,7 +75,7 @@ checkpoint(Pid) ->
|
||||||
init([PageSize, SequencerPid, Proj, CallbackMod, StreamNum]) ->
|
init([PageSize, SequencerPid, Proj, CallbackMod, StreamNum]) ->
|
||||||
LastLPN = find_last_lpn(SequencerPid, StreamNum),
|
LastLPN = find_last_lpn(SequencerPid, StreamNum),
|
||||||
{LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, 0, StreamNum),
|
{LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, 0, StreamNum),
|
||||||
BackPs = lists:reverse(LPNs),
|
BackPs = tango:append_lpns(LPNs, []),
|
||||||
LastFetchLPN = tango:back_ps2last_lpn(BackPs),
|
LastFetchLPN = tango:back_ps2last_lpn(BackPs),
|
||||||
I_State = play_log_pages(Pages, CallbackMod:fresh(), CallbackMod, false),
|
I_State = play_log_pages(Pages, CallbackMod:fresh(), CallbackMod, false),
|
||||||
{ok, #state{page_size=PageSize,
|
{ok, #state{page_size=PageSize,
|
||||||
|
@ -144,7 +144,6 @@ find_last_lpn(SequencerPid, StreamNum) ->
|
||||||
|
|
||||||
fetch_unread_pages(Proj, LastLPN, StopAtLPN, StreamNum)
|
fetch_unread_pages(Proj, LastLPN, StopAtLPN, StreamNum)
|
||||||
when LastLPN >= StopAtLPN ->
|
when LastLPN >= StopAtLPN ->
|
||||||
%% ?D({fetch_unread_pages, LastLPN, StopAtLPN}),
|
|
||||||
LPNandPages = tango:scan_backward(Proj, StreamNum, LastLPN,
|
LPNandPages = tango:scan_backward(Proj, StreamNum, LastLPN,
|
||||||
StopAtLPN, true),
|
StopAtLPN, true),
|
||||||
{_LPNs, _Pages} = lists:unzip(LPNandPages).
|
{_LPNs, _Pages} = lists:unzip(LPNandPages).
|
||||||
|
@ -162,11 +161,8 @@ roll_log_forward(#state{seq=SequencerPid, proj=Proj, all_back_ps=BackPs,
|
||||||
last_fetch_lpn=StopAtLPN} = State) ->
|
last_fetch_lpn=StopAtLPN} = State) ->
|
||||||
LastLPN = find_last_lpn(SequencerPid, StreamNum),
|
LastLPN = find_last_lpn(SequencerPid, StreamNum),
|
||||||
{LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN, StreamNum),
|
{LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN, StreamNum),
|
||||||
NewBPs = append_lpns(LPNs, BackPs),
|
NewBackPs = tango:append_lpns(LPNs, BackPs),
|
||||||
play_log_pages(Pages, true, State#state{all_back_ps=NewBPs}).
|
LastFetchLPN = tango:back_ps2last_lpn(NewBackPs),
|
||||||
|
play_log_pages(Pages, true,
|
||||||
append_lpns([], BPs) ->
|
State#state{all_back_ps=NewBackPs,
|
||||||
BPs;
|
last_fetch_lpn=LastFetchLPN}).
|
||||||
append_lpns(LPNs, BPs) ->
|
|
||||||
lists:reverse(LPNs) ++ BPs.
|
|
||||||
|
|
||||||
|
|
|
@ -70,7 +70,7 @@ get(Pid, Key) ->
|
||||||
init([PageSize, SequencerPid, Proj]) ->
|
init([PageSize, SequencerPid, Proj]) ->
|
||||||
LastLPN = find_last_lpn(SequencerPid, ?OID_STREAM_NUMBER),
|
LastLPN = find_last_lpn(SequencerPid, ?OID_STREAM_NUMBER),
|
||||||
{LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, 0, ?OID_STREAM_NUMBER),
|
{LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, 0, ?OID_STREAM_NUMBER),
|
||||||
BackPs = lists:reverse(LPNs),
|
BackPs = tango:append_lpns(LPNs, []),
|
||||||
LastFetchLPN = tango:back_ps2last_lpn(BackPs),
|
LastFetchLPN = tango:back_ps2last_lpn(BackPs),
|
||||||
I_State = play_log_pages(Pages, fresh(), ?MODULE, false),
|
I_State = play_log_pages(Pages, fresh(), ?MODULE, false),
|
||||||
{ok, #state{page_size=PageSize,
|
{ok, #state{page_size=PageSize,
|
||||||
|
@ -127,7 +127,6 @@ find_last_lpn(SequencerPid, StreamNum) ->
|
||||||
|
|
||||||
fetch_unread_pages(Proj, LastLPN, StopAtLPN, StreamNum)
|
fetch_unread_pages(Proj, LastLPN, StopAtLPN, StreamNum)
|
||||||
when LastLPN >= StopAtLPN ->
|
when LastLPN >= StopAtLPN ->
|
||||||
%% ?D({fetch_unread_pages, LastLPN, StopAtLPN}),
|
|
||||||
LPNandPages = tango:scan_backward(Proj, StreamNum, LastLPN,
|
LPNandPages = tango:scan_backward(Proj, StreamNum, LastLPN,
|
||||||
StopAtLPN, true),
|
StopAtLPN, true),
|
||||||
{_LPNs, _Pages} = lists:unzip(LPNandPages).
|
{_LPNs, _Pages} = lists:unzip(LPNandPages).
|
||||||
|
@ -144,14 +143,9 @@ roll_log_forward(#state{seq=SequencerPid, proj=Proj, all_back_ps=BackPs,
|
||||||
last_fetch_lpn=StopAtLPN} = State) ->
|
last_fetch_lpn=StopAtLPN} = State) ->
|
||||||
LastLPN = find_last_lpn(SequencerPid, ?OID_STREAM_NUMBER),
|
LastLPN = find_last_lpn(SequencerPid, ?OID_STREAM_NUMBER),
|
||||||
{LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN, ?OID_STREAM_NUMBER),
|
{LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN, ?OID_STREAM_NUMBER),
|
||||||
NewBPs = append_lpns(LPNs, BackPs),
|
NewBPs = tango:append_lpns(LPNs, BackPs),
|
||||||
play_log_pages(Pages, true, State#state{all_back_ps=NewBPs}).
|
play_log_pages(Pages, true, State#state{all_back_ps=NewBPs}).
|
||||||
|
|
||||||
append_lpns([], BPs) ->
|
|
||||||
BPs;
|
|
||||||
append_lpns(LPNs, BPs) ->
|
|
||||||
lists:reverse(LPNs) ++ BPs.
|
|
||||||
|
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
|
||||||
-record(oid_map, {
|
-record(oid_map, {
|
||||||
|
|
|
@ -260,7 +260,8 @@ tango_dt_queue_int(PageSize, Seq, Proj) ->
|
||||||
|
|
||||||
{ok, true} = MOD:is_empty(Q1),
|
{ok, true} = MOD:is_empty(Q1),
|
||||||
{ok, 0} = MOD:length(Q1),
|
{ok, 0} = MOD:length(Q1),
|
||||||
Num1 = 15,
|
|
||||||
|
Num1 = 4,
|
||||||
Seq1 = lists:seq(1, Num1),
|
Seq1 = lists:seq(1, Num1),
|
||||||
RevSeq1 = lists:reverse(Seq1),
|
RevSeq1 = lists:reverse(Seq1),
|
||||||
[ok = MOD:in(Q1, X) || X <- Seq1],
|
[ok = MOD:in(Q1, X) || X <- Seq1],
|
||||||
|
|
Loading…
Reference in a new issue