WIP: all but queue checkpointing now passes

This commit is contained in:
Scott Lystig Fritchie 2014-09-20 23:54:03 +09:00
parent 0b3bb3ee7c
commit fed2f43783
7 changed files with 83 additions and 111 deletions

View file

@ -30,7 +30,8 @@
scan_backward/4,
scan_backward/5,
pad_bin/2,
append_page/3]).
append_page/3,
back_ps2last_lpn/1]).
-define(MAGIC_NUMBER_V1, 16#88990011).
@ -87,8 +88,8 @@ add_back_pointer(BackPs, New) ->
[New|BackPs].
convert_options_list2int(Options) ->
lists:foldl(fun(to_final_page, Int) -> Int + 1;
(_, Int) -> Int
lists:foldl(fun(t_final_page, Int) -> Int + 1;
(_, Int) -> Int
end, 0, Options).
scan_backward(Proj, Stream, LastLPN, WithPagesP) ->
@ -161,7 +162,7 @@ append_page(#proj{seq={Sequencer,_,_}, page_size=PageSize} = Proj,
StreamList),
%% pulse_tracing_add(write, LPN),
StreamBackPs = lists:zip(StreamList, BackPsList),
Page = tango:pack_v1(StreamBackPs, [to_final_page],
Page = tango:pack_v1(StreamBackPs, [t_final_page],
OrigPage, PageSize),
append_page1(Proj, LPN, Page, StreamList, 5, OrigPage)
catch
@ -204,3 +205,8 @@ append_page2(Proj, LPN, Page) ->
XX
%% Let it crash: error_unwritten
end.
back_ps2last_lpn([]) ->
0;
back_ps2last_lpn([H|_]) ->
H.

View file

@ -39,10 +39,10 @@
page_size :: non_neg_integer(), % Corfurl page size
seq :: pid(), % sequencer pid
proj :: term(), % projection
stream_num :: non_neg_integer(), % this instance's OID number
stream_num :: non_neg_integer(), % this instance's OID number
cb_mod :: atom(), % callback module
last_read_lpn :: lpn(), %
back_ps :: [lpn()], % back pointers (up to 4)
last_fetch_lpn :: lpn(), %
all_back_ps :: [lpn()], % All back-pointers LIFO order!
i_state :: term() % internal state thingie
}).
@ -53,8 +53,7 @@
-callback do_pure_op(term(), callback_i_state()) -> term().
-callback do_dirty_op(term(), gen_server_from(), callback_i_state(),
StreamNum::non_neg_integer(),
Proj0::term(), PageSize::non_neg_integer(),
BackPs::list()) ->
Proj0::term(), PageSize::non_neg_integer()) ->
{Reply::term(), New_I_State::callback_i_state(),
Proj::term(), LPN::non_neg_integer(), NewBackPs::list()}.
-callback play_log_mutate_i_state([binary()], boolean(), callback_i_state()) ->
@ -74,31 +73,32 @@ checkpoint(Pid) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
init([PageSize, SequencerPid, Proj, CallbackMod, StreamNum]) ->
LastLPN = find_last_lpn(SequencerPid),
{BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, 0, [], StreamNum),
LastLPN = find_last_lpn(SequencerPid, StreamNum),
{LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, 0, StreamNum),
BackPs = lists:reverse(LPNs),
LastFetchLPN = tango:back_ps2last_lpn(BackPs),
I_State = play_log_pages(Pages, CallbackMod:fresh(), CallbackMod, false),
{ok, #state{page_size=PageSize,
seq=SequencerPid,
proj=Proj,
cb_mod=CallbackMod,
stream_num=StreamNum,
last_read_lpn=LastLPN,
back_ps=BackPs,
last_fetch_lpn=LastFetchLPN,
all_back_ps=BackPs,
i_state=I_State}}.
handle_call({cb_dirty_op, Op}, From,
#state{proj=Proj0, cb_mod=CallbackMod, stream_num=StreamNum,
page_size=PageSize, back_ps=BackPs, i_state=I_State}=State)->
{AsyncType, I_State2, Proj1, LPN, NewBackPs} =
page_size=PageSize, i_state=I_State}=State)->
{AsyncType, I_State2, Proj1, _LPN} =
CallbackMod:do_dirty_op(Op, From, I_State, StreamNum,
Proj0, PageSize, BackPs),
Proj0, PageSize),
State2 = State#state{i_state=I_State2,
proj=Proj1,
back_ps=NewBackPs},
proj=Proj1},
if AsyncType == op_t_async ->
{reply, ok, State2};
AsyncType == op_t_sync ->
State3 = roll_log_forward(LPN, State2),
State3 = roll_log_forward(State2),
{noreply, State3}
end;
handle_call({cb_pure_op, Op}, _From, #state{cb_mod=CallbackMod} = State) ->
@ -109,15 +109,14 @@ handle_call({sync_checkpoint}, From,
#state{proj=Proj0, cb_mod=CallbackMod, stream_num=StreamNum,
page_size=PageSize, i_state=I_State}=State)->
CheckpointOps = CallbackMod:do_checkpoint(I_State),
CheckpointBackPs = [],
{_OpT, I_State2, Proj1, _LPN, NewBackPs} =
%% CheckpointBackPs = [],
{_OpT, I_State2, Proj1, _LPN} =
CallbackMod:do_dirty_op(CheckpointOps, From, I_State, StreamNum,
Proj0, PageSize, CheckpointBackPs),
Proj0, PageSize),
%% TODO: Use this LPN so that we can tell the corfurl log GC
%% that we have created some dead bytes in the log.
{reply, ok, State#state{i_state=I_State2,
proj=Proj1,
back_ps=NewBackPs}};
proj=Proj1}};
handle_call({stop}, _From, State) ->
{stop, normal, ok, State};
handle_call(_Request, _From, State) ->
@ -138,34 +137,17 @@ code_change(_OldVsn, State, _Extra) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
find_last_lpn(#state{seq=SequencerPid}) ->
find_last_lpn(SequencerPid);
find_last_lpn(SequencerPid) ->
{ok, CurrentLPN} = corfurl_sequencer:get(SequencerPid, 0),
CurrentLPN - 1.
find_last_lpn(SequencerPid, StreamNum) ->
{ok, _, [BackPs]} = corfurl_sequencer:get_tails(SequencerPid,
0, [StreamNum]),
tango:back_ps2last_lpn(BackPs).
fetch_unread_pages(#state{seq=SequencerPid, stream_num=StreamNum} = State) ->
{ok, [LastLPN]} = corfurl_sequencer:get_tails(SequencerPid,
[StreamNum]),
fetch_unread_pages2(LastLPN, State).
fetch_unread_pages(LastLPN, State) ->
fetch_unread_pages2(LastLPN, State).
fetch_unread_pages2(LastLPN,
#state{proj=Proj, stream_num=StreamNum, back_ps=OldBackPs,
last_read_lpn=StopAtLPN} = State) ->
{BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN,
OldBackPs, StreamNum),
{Pages, State#state{last_read_lpn=LastLPN, back_ps=BackPs}}.
fetch_unread_pages(Proj, LastLPN, StopAtLPN, OldBackPs, StreamNum) ->
fetch_unread_pages(Proj, LastLPN, StopAtLPN, StreamNum)
when LastLPN >= StopAtLPN ->
%% ?D({fetch_unread_pages, LastLPN, StopAtLPN}),
LPNandPages = tango:scan_backward(Proj, StreamNum, LastLPN,
StopAtLPN, true),
{LPNs, Pages} = lists:unzip(LPNandPages),
BackPs = lists:foldl(fun(P, BPs) -> tango:add_back_pointer(BPs, P) end,
OldBackPs, LPNs),
{BackPs, Pages}.
{_LPNs, _Pages} = lists:unzip(LPNandPages).
play_log_pages(Pages, SideEffectsP,
#state{cb_mod=CallbackMod, i_state=I_State} = State) ->
@ -175,11 +157,16 @@ play_log_pages(Pages, SideEffectsP,
play_log_pages(Pages, I_State, CallbackMod, SideEffectsP) ->
CallbackMod:play_log_mutate_i_state(Pages, SideEffectsP, I_State).
roll_log_forward(State) ->
{Pages, State2} = fetch_unread_pages(State),
play_log_pages(Pages, true, State2).
roll_log_forward(#state{seq=SequencerPid, proj=Proj, all_back_ps=BackPs,
stream_num=StreamNum,
last_fetch_lpn=StopAtLPN} = State) ->
LastLPN = find_last_lpn(SequencerPid, StreamNum),
{LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN, StreamNum),
NewBPs = append_lpns(LPNs, BackPs),
play_log_pages(Pages, true, State#state{all_back_ps=NewBPs}).
roll_log_forward(MaybeStartingLPN, State) ->
{Pages, State2} = fetch_unread_pages(MaybeStartingLPN, State),
play_log_pages(Pages, true, State2).
append_lpns([], BPs) ->
BPs;
append_lpns(LPNs, BPs) ->
lists:reverse(LPNs) ++ BPs.

View file

@ -28,7 +28,7 @@
%% Tango datatype callbacks
-export([fresh/0,
do_pure_op/2, do_dirty_op/7, do_checkpoint/1,
do_pure_op/2, do_dirty_op/6, do_checkpoint/1,
play_log_mutate_i_state/3]).
-define(DICTMOD, dict).
@ -61,16 +61,13 @@ do_pure_op({o_get, Key}, Dict) ->
?DICTMOD:find(Key, Dict).
do_dirty_op(Op0, _From,
I_State, StreamNum, Proj0, PageSize, BackPs) ->
I_State, StreamNum, Proj0, ___TODO_delme_PageSize) ->
Op = if is_list(Op0) -> Op0;
true -> [Op0] % always make a list
end,
Page = term_to_binary(Op),
FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize),
{{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage,
[StreamNum]),
NewBackPs = tango:add_back_pointer(BackPs, LPN),
{op_t_async, I_State, Proj1, LPN, NewBackPs}.
{{ok, LPN}, Proj1} = tango:append_page(Proj0, Page, [StreamNum]),
{op_t_async, I_State, Proj1, LPN}.
do_checkpoint(Dict=_I_State) ->
[{o_start_checkpoint}|[{o_set, X, Y} || {X, Y} <- ?DICTMOD:to_list(Dict)]].

View file

@ -29,7 +29,7 @@
%% Tango datatype callbacks
-export([fresh/0,
do_pure_op/2, do_dirty_op/7, do_checkpoint/1,
do_pure_op/2, do_dirty_op/6, do_checkpoint/1,
play_log_mutate_i_state/3]).
-define(LONG_TIME, 30*1000).
@ -91,14 +91,11 @@ do_pure_op({o_member, X}, Q) ->
{ok, queue:member(X, Q)}.
do_dirty_op(Op0, From,
I_State, StreamNum, Proj0, PageSize, BackPs) ->
I_State, StreamNum, Proj0, ___TODO_delme_PageSize) ->
{AsyncType, Op} = transform_dirty_op(Op0, From),
Page = term_to_binary(Op),
FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize),
{{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage,
[StreamNum]),
NewBackPs = tango:add_back_pointer(BackPs, LPN),
{AsyncType, I_State, Proj1, LPN, NewBackPs}.
{{ok, LPN}, Proj1} = tango:append_page(Proj0, Page, [StreamNum]),
{AsyncType, I_State, Proj1, LPN}.
do_checkpoint(Q=_I_State) ->
[{o_start_checkpoint}|[{o_in, X} || X <- queue:to_list(Q)]].

View file

@ -28,7 +28,7 @@
%% Tango datatype callbacks
-export([fresh/0,
do_pure_op/2, do_dirty_op/7, do_checkpoint/1,
do_pure_op/2, do_dirty_op/6, do_checkpoint/1,
play_log_mutate_i_state/3]).
-define(LONG_TIME, 30*1000).
@ -58,16 +58,13 @@ do_pure_op({o_get}, Register) ->
{ok, Register}.
do_dirty_op(Op0, _From,
I_State, StreamNum, Proj0, PageSize, BackPs) ->
I_State, StreamNum, Proj0, ___TODO_delme_PageSize) ->
Op = if is_list(Op0) -> Op0;
true -> [Op0] % always make a list
end,
Page = term_to_binary(Op),
FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize),
{{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage,
[StreamNum]),
NewBackPs = tango:add_back_pointer(BackPs, LPN),
{op_t_async, I_State, Proj1, LPN, NewBackPs}.
{{ok, LPN}, Proj1} = tango:append_page(Proj0, Page, [StreamNum]),
{op_t_async, I_State, Proj1, LPN}.
do_checkpoint(Register=_I_State) ->
[{o_start_checkpoint},{o_set, Register}].

View file

@ -32,7 +32,7 @@
%% Tango datatype callbacks (prototype)
-export([fresh/0,
do_pure_op/2, do_dirty_op/5, play_log_mutate_i_state/3]).
do_pure_op/2, do_dirty_op/4, play_log_mutate_i_state/3]).
-define(SERVER, ?MODULE).
-define(OID_STREAM_NUMBER, 0).
@ -68,10 +68,10 @@ get(Pid, Key) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
init([PageSize, SequencerPid, Proj]) ->
LastLPN = find_last_lpn(SequencerPid),
{LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, 0),
LastLPN = find_last_lpn(SequencerPid, ?OID_STREAM_NUMBER),
{LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, 0, ?OID_STREAM_NUMBER),
BackPs = lists:reverse(LPNs),
LastFetchLPN = back_ps2last_lpn(BackPs),
LastFetchLPN = tango:back_ps2last_lpn(BackPs),
I_State = play_log_pages(Pages, fresh(), ?MODULE, false),
{ok, #state{page_size=PageSize,
seq=SequencerPid,
@ -81,10 +81,10 @@ init([PageSize, SequencerPid, Proj]) ->
i_state=I_State}}.
handle_call({new, Key}, From,
#state{proj=Proj0, page_size=PageSize, i_state=I_State}=State) ->
#state{proj=Proj0, i_state=I_State}=State) ->
Op = {new_oid, Key, From, 0},
{_Res, I_State2, Proj1, _LPN} =
do_dirty_op(Op, I_State, ?OID_STREAM_NUMBER, Proj0, PageSize),
do_dirty_op(Op, I_State, ?OID_STREAM_NUMBER, Proj0),
%% Let's see how much trouble we can get outselves in here.
%% If we're here, then we've written to the log without error.
%% So then the cast to roll forward must see that log entry
@ -120,28 +120,15 @@ code_change(_OldVsn, State, _Extra) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
back_ps2last_lpn([]) ->
0;
back_ps2last_lpn([H|_]) ->
H.
find_last_lpn(SequencerPid, StreamNum) ->
{ok, _, [BackPs]} = corfurl_sequencer:get_tails(SequencerPid,
0, [StreamNum]),
tango:back_ps2last_lpn(BackPs).
find_last_lpn(#state{seq=SequencerPid}) ->
find_last_lpn(SequencerPid);
find_last_lpn(SequencerPid) ->
{ok, _, [BackPs]} = corfurl_sequencer:get_tails(SequencerPid, 0, [?OID_STREAM_NUMBER]),
back_ps2last_lpn(BackPs).
%% fetch_unread_pages(#state{proj=Proj, last_fetch_lpn=StopAtLPN,
%% all_back_ps=BPs} = State) ->
%% LastLPN = find_last_lpn(State),
%% {LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN),
%% NewBPs = append_lpns(LPNs, BPs),
%% {LPNs, Pages, State#state{last_fetch_lpn=LastLPN, all_back_ps=NewBPs}}.
fetch_unread_pages(Proj, LastLPN, StopAtLPN)
fetch_unread_pages(Proj, LastLPN, StopAtLPN, StreamNum)
when LastLPN >= StopAtLPN ->
%% ?D({fetch_unread_pages, LastLPN, StopAtLPN}),
LPNandPages = tango:scan_backward(Proj, ?OID_STREAM_NUMBER, LastLPN,
LPNandPages = tango:scan_backward(Proj, StreamNum, LastLPN,
StopAtLPN, true),
{_LPNs, _Pages} = lists:unzip(LPNandPages).
@ -155,8 +142,8 @@ play_log_pages(Pages, I_State, CallbackMod, SideEffectsP) ->
roll_log_forward(#state{seq=SequencerPid, proj=Proj, all_back_ps=BackPs,
last_fetch_lpn=StopAtLPN} = State) ->
LastLPN = find_last_lpn(SequencerPid),
{LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN),
LastLPN = find_last_lpn(SequencerPid, ?OID_STREAM_NUMBER),
{LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN, ?OID_STREAM_NUMBER),
NewBPs = append_lpns(LPNs, BackPs),
play_log_pages(Pages, true, State#state{all_back_ps=NewBPs}).
@ -182,7 +169,7 @@ do_pure_op({get, Key}, #oid_map{map=Dict}) ->
?DICTMOD:find(Key, Dict).
do_dirty_op({new_oid, _Key, _From, _NumOfAttempts}=Op,
I_State, StreamNum, Proj0, ___TODO_delme_PageSize) ->
I_State, StreamNum, Proj0) ->
Page = term_to_binary(Op),
{{ok, LPN}, Proj1} = tango:append_page(Proj0, Page, [StreamNum]),
{ok, I_State, Proj1, LPN}.

View file

@ -42,11 +42,12 @@
pack_v1_test() ->
[begin
Packed = ?T:pack_v1(StreamList, term_to_binary(Term), Size),
Packed = ?T:pack_v1(StreamList, Options, term_to_binary(Term), Size),
StreamList = ?T:unpack_v1(Packed, stream_list),
TermBin = ?T:unpack_v1(Packed, page),
Term = binary_to_term(TermBin)
end || StreamList <- [[], [1], [1,2,4]],
Options <- [[]],
Term <- [foo, {bar, baz, <<"yo">>}],
Size <- lists:seq(100, 5000, 500)].
@ -67,7 +68,7 @@ smoke_test() ->
smoke_test_int(PageSize, Seq, P1) ->
ok = ?SEQ:set_tails(Seq, [{42,4242}, {43,4343}]),
{ok, [4242, 4343]} = ?SEQ:get_tails(Seq, [42, 43]),
{ok, _, [4242, 4343]} = ?SEQ:get_tails(Seq, 0, [42, 43]),
LPN_Pgs = [{X, ?T:pad_bin(PageSize, term_to_binary({smoke, X}))} ||
X <- lists:seq(1, 5)],
@ -97,16 +98,16 @@ write_forward_test_int(PageSize, _Seq, P1) ->
write_stream_pages(Proj0, Pages, PageSize, InitialBackPs, StreamNum) ->
write_stream_pages(Proj0, Pages, PageSize, InitialBackPs, StreamNum, 0).
write_stream_pages(Proj0, Pages, PageSize, InitialBackPs, StreamNum, Junk) ->
WriteJunk = fun() -> JP0 = tango:pack_v1([], <<>>, PageSize),
{{ok, _}, _} = corfurl_client:append_page(Proj0, JP0)
write_stream_pages(Proj0, Pages, _PageSize, InitialBackPs, StreamNum, Junk) ->
WriteJunk = fun() -> JP0 = <<"blah">>,
{{ok, _}, _} = tango:append_page(Proj0, JP0,
[StreamNum])
end,
F = fun(Page, {Proj1, BackPs}) ->
if Junk band 1 /= 0 -> WriteJunk();
true -> ok end,
FullPage = tango:pack_v1(BackPs, Page, PageSize),
{{ok, LPN}, Proj2} =
corfurl_client:append_page(Proj1, FullPage),
tango:append_page(Proj1, Page, [StreamNum]),
if Junk band 1 /= 0 -> WriteJunk();
true -> ok end,
{Proj2, tango:add_back_pointer(StreamNum, BackPs, LPN)}