diff --git a/prototype/tango-prototype/src/tango.erl b/prototype/tango-prototype/src/tango.erl index 3ce586b..d735911 100644 --- a/prototype/tango-prototype/src/tango.erl +++ b/prototype/tango-prototype/src/tango.erl @@ -30,7 +30,8 @@ scan_backward/4, scan_backward/5, pad_bin/2, - append_page/3]). + append_page/3, + back_ps2last_lpn/1]). -define(MAGIC_NUMBER_V1, 16#88990011). @@ -87,8 +88,8 @@ add_back_pointer(BackPs, New) -> [New|BackPs]. convert_options_list2int(Options) -> - lists:foldl(fun(to_final_page, Int) -> Int + 1; - (_, Int) -> Int + lists:foldl(fun(t_final_page, Int) -> Int + 1; + (_, Int) -> Int end, 0, Options). scan_backward(Proj, Stream, LastLPN, WithPagesP) -> @@ -161,7 +162,7 @@ append_page(#proj{seq={Sequencer,_,_}, page_size=PageSize} = Proj, StreamList), %% pulse_tracing_add(write, LPN), StreamBackPs = lists:zip(StreamList, BackPsList), - Page = tango:pack_v1(StreamBackPs, [to_final_page], + Page = tango:pack_v1(StreamBackPs, [t_final_page], OrigPage, PageSize), append_page1(Proj, LPN, Page, StreamList, 5, OrigPage) catch @@ -204,3 +205,8 @@ append_page2(Proj, LPN, Page) -> XX %% Let it crash: error_unwritten end. + +back_ps2last_lpn([]) -> + 0; +back_ps2last_lpn([H|_]) -> + H. diff --git a/prototype/tango-prototype/src/tango_dt.erl b/prototype/tango-prototype/src/tango_dt.erl index 066066c..e0be1bb 100644 --- a/prototype/tango-prototype/src/tango_dt.erl +++ b/prototype/tango-prototype/src/tango_dt.erl @@ -39,10 +39,10 @@ page_size :: non_neg_integer(), % Corfurl page size seq :: pid(), % sequencer pid proj :: term(), % projection - stream_num :: non_neg_integer(), % this instance's OID number + stream_num :: non_neg_integer(), % this instance's OID number cb_mod :: atom(), % callback module - last_read_lpn :: lpn(), % - back_ps :: [lpn()], % back pointers (up to 4) + last_fetch_lpn :: lpn(), % + all_back_ps :: [lpn()], % All back-pointers LIFO order! i_state :: term() % internal state thingie }). @@ -53,8 +53,7 @@ -callback do_pure_op(term(), callback_i_state()) -> term(). -callback do_dirty_op(term(), gen_server_from(), callback_i_state(), StreamNum::non_neg_integer(), - Proj0::term(), PageSize::non_neg_integer(), - BackPs::list()) -> + Proj0::term(), PageSize::non_neg_integer()) -> {Reply::term(), New_I_State::callback_i_state(), Proj::term(), LPN::non_neg_integer(), NewBackPs::list()}. -callback play_log_mutate_i_state([binary()], boolean(), callback_i_state()) -> @@ -74,31 +73,32 @@ checkpoint(Pid) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% init([PageSize, SequencerPid, Proj, CallbackMod, StreamNum]) -> - LastLPN = find_last_lpn(SequencerPid), - {BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, 0, [], StreamNum), + LastLPN = find_last_lpn(SequencerPid, StreamNum), + {LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, 0, StreamNum), + BackPs = lists:reverse(LPNs), + LastFetchLPN = tango:back_ps2last_lpn(BackPs), I_State = play_log_pages(Pages, CallbackMod:fresh(), CallbackMod, false), {ok, #state{page_size=PageSize, seq=SequencerPid, proj=Proj, cb_mod=CallbackMod, stream_num=StreamNum, - last_read_lpn=LastLPN, - back_ps=BackPs, + last_fetch_lpn=LastFetchLPN, + all_back_ps=BackPs, i_state=I_State}}. handle_call({cb_dirty_op, Op}, From, #state{proj=Proj0, cb_mod=CallbackMod, stream_num=StreamNum, - page_size=PageSize, back_ps=BackPs, i_state=I_State}=State)-> - {AsyncType, I_State2, Proj1, LPN, NewBackPs} = + page_size=PageSize, i_state=I_State}=State)-> + {AsyncType, I_State2, Proj1, _LPN} = CallbackMod:do_dirty_op(Op, From, I_State, StreamNum, - Proj0, PageSize, BackPs), + Proj0, PageSize), State2 = State#state{i_state=I_State2, - proj=Proj1, - back_ps=NewBackPs}, + proj=Proj1}, if AsyncType == op_t_async -> {reply, ok, State2}; AsyncType == op_t_sync -> - State3 = roll_log_forward(LPN, State2), + State3 = roll_log_forward(State2), {noreply, State3} end; handle_call({cb_pure_op, Op}, _From, #state{cb_mod=CallbackMod} = State) -> @@ -109,15 +109,14 @@ handle_call({sync_checkpoint}, From, #state{proj=Proj0, cb_mod=CallbackMod, stream_num=StreamNum, page_size=PageSize, i_state=I_State}=State)-> CheckpointOps = CallbackMod:do_checkpoint(I_State), - CheckpointBackPs = [], - {_OpT, I_State2, Proj1, _LPN, NewBackPs} = + %% CheckpointBackPs = [], + {_OpT, I_State2, Proj1, _LPN} = CallbackMod:do_dirty_op(CheckpointOps, From, I_State, StreamNum, - Proj0, PageSize, CheckpointBackPs), + Proj0, PageSize), %% TODO: Use this LPN so that we can tell the corfurl log GC %% that we have created some dead bytes in the log. {reply, ok, State#state{i_state=I_State2, - proj=Proj1, - back_ps=NewBackPs}}; + proj=Proj1}}; handle_call({stop}, _From, State) -> {stop, normal, ok, State}; handle_call(_Request, _From, State) -> @@ -138,34 +137,17 @@ code_change(_OldVsn, State, _Extra) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -find_last_lpn(#state{seq=SequencerPid}) -> - find_last_lpn(SequencerPid); -find_last_lpn(SequencerPid) -> - {ok, CurrentLPN} = corfurl_sequencer:get(SequencerPid, 0), - CurrentLPN - 1. +find_last_lpn(SequencerPid, StreamNum) -> + {ok, _, [BackPs]} = corfurl_sequencer:get_tails(SequencerPid, + 0, [StreamNum]), + tango:back_ps2last_lpn(BackPs). -fetch_unread_pages(#state{seq=SequencerPid, stream_num=StreamNum} = State) -> - {ok, [LastLPN]} = corfurl_sequencer:get_tails(SequencerPid, - [StreamNum]), - fetch_unread_pages2(LastLPN, State). - -fetch_unread_pages(LastLPN, State) -> - fetch_unread_pages2(LastLPN, State). - -fetch_unread_pages2(LastLPN, - #state{proj=Proj, stream_num=StreamNum, back_ps=OldBackPs, - last_read_lpn=StopAtLPN} = State) -> - {BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN, - OldBackPs, StreamNum), - {Pages, State#state{last_read_lpn=LastLPN, back_ps=BackPs}}. - -fetch_unread_pages(Proj, LastLPN, StopAtLPN, OldBackPs, StreamNum) -> +fetch_unread_pages(Proj, LastLPN, StopAtLPN, StreamNum) + when LastLPN >= StopAtLPN -> + %% ?D({fetch_unread_pages, LastLPN, StopAtLPN}), LPNandPages = tango:scan_backward(Proj, StreamNum, LastLPN, StopAtLPN, true), - {LPNs, Pages} = lists:unzip(LPNandPages), - BackPs = lists:foldl(fun(P, BPs) -> tango:add_back_pointer(BPs, P) end, - OldBackPs, LPNs), - {BackPs, Pages}. + {_LPNs, _Pages} = lists:unzip(LPNandPages). play_log_pages(Pages, SideEffectsP, #state{cb_mod=CallbackMod, i_state=I_State} = State) -> @@ -175,11 +157,16 @@ play_log_pages(Pages, SideEffectsP, play_log_pages(Pages, I_State, CallbackMod, SideEffectsP) -> CallbackMod:play_log_mutate_i_state(Pages, SideEffectsP, I_State). -roll_log_forward(State) -> - {Pages, State2} = fetch_unread_pages(State), - play_log_pages(Pages, true, State2). +roll_log_forward(#state{seq=SequencerPid, proj=Proj, all_back_ps=BackPs, + stream_num=StreamNum, + last_fetch_lpn=StopAtLPN} = State) -> + LastLPN = find_last_lpn(SequencerPid, StreamNum), + {LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN, StreamNum), + NewBPs = append_lpns(LPNs, BackPs), + play_log_pages(Pages, true, State#state{all_back_ps=NewBPs}). -roll_log_forward(MaybeStartingLPN, State) -> - {Pages, State2} = fetch_unread_pages(MaybeStartingLPN, State), - play_log_pages(Pages, true, State2). +append_lpns([], BPs) -> + BPs; +append_lpns(LPNs, BPs) -> + lists:reverse(LPNs) ++ BPs. diff --git a/prototype/tango-prototype/src/tango_dt_map.erl b/prototype/tango-prototype/src/tango_dt_map.erl index eed5ebf..2179302 100644 --- a/prototype/tango-prototype/src/tango_dt_map.erl +++ b/prototype/tango-prototype/src/tango_dt_map.erl @@ -28,7 +28,7 @@ %% Tango datatype callbacks -export([fresh/0, - do_pure_op/2, do_dirty_op/7, do_checkpoint/1, + do_pure_op/2, do_dirty_op/6, do_checkpoint/1, play_log_mutate_i_state/3]). -define(DICTMOD, dict). @@ -61,16 +61,13 @@ do_pure_op({o_get, Key}, Dict) -> ?DICTMOD:find(Key, Dict). do_dirty_op(Op0, _From, - I_State, StreamNum, Proj0, PageSize, BackPs) -> + I_State, StreamNum, Proj0, ___TODO_delme_PageSize) -> Op = if is_list(Op0) -> Op0; true -> [Op0] % always make a list end, Page = term_to_binary(Op), - FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize), - {{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage, - [StreamNum]), - NewBackPs = tango:add_back_pointer(BackPs, LPN), - {op_t_async, I_State, Proj1, LPN, NewBackPs}. + {{ok, LPN}, Proj1} = tango:append_page(Proj0, Page, [StreamNum]), + {op_t_async, I_State, Proj1, LPN}. do_checkpoint(Dict=_I_State) -> [{o_start_checkpoint}|[{o_set, X, Y} || {X, Y} <- ?DICTMOD:to_list(Dict)]]. diff --git a/prototype/tango-prototype/src/tango_dt_queue.erl b/prototype/tango-prototype/src/tango_dt_queue.erl index 27f14ea..e1f0dbb 100644 --- a/prototype/tango-prototype/src/tango_dt_queue.erl +++ b/prototype/tango-prototype/src/tango_dt_queue.erl @@ -29,7 +29,7 @@ %% Tango datatype callbacks -export([fresh/0, - do_pure_op/2, do_dirty_op/7, do_checkpoint/1, + do_pure_op/2, do_dirty_op/6, do_checkpoint/1, play_log_mutate_i_state/3]). -define(LONG_TIME, 30*1000). @@ -91,14 +91,11 @@ do_pure_op({o_member, X}, Q) -> {ok, queue:member(X, Q)}. do_dirty_op(Op0, From, - I_State, StreamNum, Proj0, PageSize, BackPs) -> + I_State, StreamNum, Proj0, ___TODO_delme_PageSize) -> {AsyncType, Op} = transform_dirty_op(Op0, From), Page = term_to_binary(Op), - FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize), - {{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage, - [StreamNum]), - NewBackPs = tango:add_back_pointer(BackPs, LPN), - {AsyncType, I_State, Proj1, LPN, NewBackPs}. + {{ok, LPN}, Proj1} = tango:append_page(Proj0, Page, [StreamNum]), + {AsyncType, I_State, Proj1, LPN}. do_checkpoint(Q=_I_State) -> [{o_start_checkpoint}|[{o_in, X} || X <- queue:to_list(Q)]]. diff --git a/prototype/tango-prototype/src/tango_dt_register.erl b/prototype/tango-prototype/src/tango_dt_register.erl index 3b09a8d..89c4209 100644 --- a/prototype/tango-prototype/src/tango_dt_register.erl +++ b/prototype/tango-prototype/src/tango_dt_register.erl @@ -28,7 +28,7 @@ %% Tango datatype callbacks -export([fresh/0, - do_pure_op/2, do_dirty_op/7, do_checkpoint/1, + do_pure_op/2, do_dirty_op/6, do_checkpoint/1, play_log_mutate_i_state/3]). -define(LONG_TIME, 30*1000). @@ -58,16 +58,13 @@ do_pure_op({o_get}, Register) -> {ok, Register}. do_dirty_op(Op0, _From, - I_State, StreamNum, Proj0, PageSize, BackPs) -> + I_State, StreamNum, Proj0, ___TODO_delme_PageSize) -> Op = if is_list(Op0) -> Op0; true -> [Op0] % always make a list end, Page = term_to_binary(Op), - FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize), - {{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage, - [StreamNum]), - NewBackPs = tango:add_back_pointer(BackPs, LPN), - {op_t_async, I_State, Proj1, LPN, NewBackPs}. + {{ok, LPN}, Proj1} = tango:append_page(Proj0, Page, [StreamNum]), + {op_t_async, I_State, Proj1, LPN}. do_checkpoint(Register=_I_State) -> [{o_start_checkpoint},{o_set, Register}]. diff --git a/prototype/tango-prototype/src/tango_oid.erl b/prototype/tango-prototype/src/tango_oid.erl index e99e7a0..811694e 100644 --- a/prototype/tango-prototype/src/tango_oid.erl +++ b/prototype/tango-prototype/src/tango_oid.erl @@ -32,7 +32,7 @@ %% Tango datatype callbacks (prototype) -export([fresh/0, - do_pure_op/2, do_dirty_op/5, play_log_mutate_i_state/3]). + do_pure_op/2, do_dirty_op/4, play_log_mutate_i_state/3]). -define(SERVER, ?MODULE). -define(OID_STREAM_NUMBER, 0). @@ -68,10 +68,10 @@ get(Pid, Key) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% init([PageSize, SequencerPid, Proj]) -> - LastLPN = find_last_lpn(SequencerPid), - {LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, 0), + LastLPN = find_last_lpn(SequencerPid, ?OID_STREAM_NUMBER), + {LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, 0, ?OID_STREAM_NUMBER), BackPs = lists:reverse(LPNs), - LastFetchLPN = back_ps2last_lpn(BackPs), + LastFetchLPN = tango:back_ps2last_lpn(BackPs), I_State = play_log_pages(Pages, fresh(), ?MODULE, false), {ok, #state{page_size=PageSize, seq=SequencerPid, @@ -81,10 +81,10 @@ init([PageSize, SequencerPid, Proj]) -> i_state=I_State}}. handle_call({new, Key}, From, - #state{proj=Proj0, page_size=PageSize, i_state=I_State}=State) -> + #state{proj=Proj0, i_state=I_State}=State) -> Op = {new_oid, Key, From, 0}, {_Res, I_State2, Proj1, _LPN} = - do_dirty_op(Op, I_State, ?OID_STREAM_NUMBER, Proj0, PageSize), + do_dirty_op(Op, I_State, ?OID_STREAM_NUMBER, Proj0), %% Let's see how much trouble we can get outselves in here. %% If we're here, then we've written to the log without error. %% So then the cast to roll forward must see that log entry @@ -120,28 +120,15 @@ code_change(_OldVsn, State, _Extra) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -back_ps2last_lpn([]) -> - 0; -back_ps2last_lpn([H|_]) -> - H. +find_last_lpn(SequencerPid, StreamNum) -> + {ok, _, [BackPs]} = corfurl_sequencer:get_tails(SequencerPid, + 0, [StreamNum]), + tango:back_ps2last_lpn(BackPs). -find_last_lpn(#state{seq=SequencerPid}) -> - find_last_lpn(SequencerPid); -find_last_lpn(SequencerPid) -> - {ok, _, [BackPs]} = corfurl_sequencer:get_tails(SequencerPid, 0, [?OID_STREAM_NUMBER]), - back_ps2last_lpn(BackPs). - -%% fetch_unread_pages(#state{proj=Proj, last_fetch_lpn=StopAtLPN, -%% all_back_ps=BPs} = State) -> -%% LastLPN = find_last_lpn(State), -%% {LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN), -%% NewBPs = append_lpns(LPNs, BPs), -%% {LPNs, Pages, State#state{last_fetch_lpn=LastLPN, all_back_ps=NewBPs}}. - -fetch_unread_pages(Proj, LastLPN, StopAtLPN) +fetch_unread_pages(Proj, LastLPN, StopAtLPN, StreamNum) when LastLPN >= StopAtLPN -> %% ?D({fetch_unread_pages, LastLPN, StopAtLPN}), - LPNandPages = tango:scan_backward(Proj, ?OID_STREAM_NUMBER, LastLPN, + LPNandPages = tango:scan_backward(Proj, StreamNum, LastLPN, StopAtLPN, true), {_LPNs, _Pages} = lists:unzip(LPNandPages). @@ -155,8 +142,8 @@ play_log_pages(Pages, I_State, CallbackMod, SideEffectsP) -> roll_log_forward(#state{seq=SequencerPid, proj=Proj, all_back_ps=BackPs, last_fetch_lpn=StopAtLPN} = State) -> - LastLPN = find_last_lpn(SequencerPid), - {LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN), + LastLPN = find_last_lpn(SequencerPid, ?OID_STREAM_NUMBER), + {LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN, ?OID_STREAM_NUMBER), NewBPs = append_lpns(LPNs, BackPs), play_log_pages(Pages, true, State#state{all_back_ps=NewBPs}). @@ -182,7 +169,7 @@ do_pure_op({get, Key}, #oid_map{map=Dict}) -> ?DICTMOD:find(Key, Dict). do_dirty_op({new_oid, _Key, _From, _NumOfAttempts}=Op, - I_State, StreamNum, Proj0, ___TODO_delme_PageSize) -> + I_State, StreamNum, Proj0) -> Page = term_to_binary(Op), {{ok, LPN}, Proj1} = tango:append_page(Proj0, Page, [StreamNum]), {ok, I_State, Proj1, LPN}. diff --git a/prototype/tango-prototype/test/tango_test.erl b/prototype/tango-prototype/test/tango_test.erl index 3c36430..1c50f58 100644 --- a/prototype/tango-prototype/test/tango_test.erl +++ b/prototype/tango-prototype/test/tango_test.erl @@ -42,11 +42,12 @@ pack_v1_test() -> [begin - Packed = ?T:pack_v1(StreamList, term_to_binary(Term), Size), + Packed = ?T:pack_v1(StreamList, Options, term_to_binary(Term), Size), StreamList = ?T:unpack_v1(Packed, stream_list), TermBin = ?T:unpack_v1(Packed, page), Term = binary_to_term(TermBin) end || StreamList <- [[], [1], [1,2,4]], + Options <- [[]], Term <- [foo, {bar, baz, <<"yo">>}], Size <- lists:seq(100, 5000, 500)]. @@ -67,7 +68,7 @@ smoke_test() -> smoke_test_int(PageSize, Seq, P1) -> ok = ?SEQ:set_tails(Seq, [{42,4242}, {43,4343}]), - {ok, [4242, 4343]} = ?SEQ:get_tails(Seq, [42, 43]), + {ok, _, [4242, 4343]} = ?SEQ:get_tails(Seq, 0, [42, 43]), LPN_Pgs = [{X, ?T:pad_bin(PageSize, term_to_binary({smoke, X}))} || X <- lists:seq(1, 5)], @@ -97,16 +98,16 @@ write_forward_test_int(PageSize, _Seq, P1) -> write_stream_pages(Proj0, Pages, PageSize, InitialBackPs, StreamNum) -> write_stream_pages(Proj0, Pages, PageSize, InitialBackPs, StreamNum, 0). -write_stream_pages(Proj0, Pages, PageSize, InitialBackPs, StreamNum, Junk) -> - WriteJunk = fun() -> JP0 = tango:pack_v1([], <<>>, PageSize), - {{ok, _}, _} = corfurl_client:append_page(Proj0, JP0) +write_stream_pages(Proj0, Pages, _PageSize, InitialBackPs, StreamNum, Junk) -> + WriteJunk = fun() -> JP0 = <<"blah">>, + {{ok, _}, _} = tango:append_page(Proj0, JP0, + [StreamNum]) end, F = fun(Page, {Proj1, BackPs}) -> if Junk band 1 /= 0 -> WriteJunk(); true -> ok end, - FullPage = tango:pack_v1(BackPs, Page, PageSize), {{ok, LPN}, Proj2} = - corfurl_client:append_page(Proj1, FullPage), + tango:append_page(Proj1, Page, [StreamNum]), if Junk band 1 /= 0 -> WriteJunk(); true -> ok end, {Proj2, tango:add_back_pointer(StreamNum, BackPs, LPN)}