From 30fc62ab226f59de497c3356edc24daab41a65a7 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Sat, 20 Sep 2014 21:44:03 +0900 Subject: [PATCH] Gadz, more sequencer cleanup. corfurl_sequencer_test now passes --- .../tango-prototype/src/corfurl_sequencer.erl | 33 +- prototype/tango-prototype/src/tango.erl | 72 +++- prototype/tango-prototype/src/tango_dt.erl | 2 +- prototype/tango-prototype/src/tango_oid.erl | 311 +++++++++--------- .../test/corfurl_sequencer_test.erl | 39 ++- 5 files changed, 266 insertions(+), 191 deletions(-) diff --git a/prototype/tango-prototype/src/corfurl_sequencer.erl b/prototype/tango-prototype/src/corfurl_sequencer.erl index b73ae06..2dd9054 100644 --- a/prototype/tango-prototype/src/corfurl_sequencer.erl +++ b/prototype/tango-prototype/src/corfurl_sequencer.erl @@ -24,7 +24,7 @@ -export([start_link/1, start_link/2, start_link/3, stop/1, stop/2, - get/2, get/3, get_tails/2]). + get/2, get_tails/3]). -export([set_tails/2]). -ifdef(TEST). -compile(export_all). @@ -77,17 +77,15 @@ stop(Pid, Method) -> end. get(Pid, NumPages) -> - get(Pid, NumPages, []). - -get(Pid, NumPages, StreamList) -> - {LPN, LC} = gen_server:call(Pid, {get, NumPages, StreamList, lclock_get()}, + {LPN, LC} = gen_server:call(Pid, {get, NumPages, lclock_get()}, ?LONG_TIME), lclock_update(LC), LPN. -get_tails(Pid, StreamList) -> - {Tails, LC} = gen_server:call(Pid, {get_tails, StreamList, lclock_get()}, - ?LONG_TIME), +get_tails(Pid, NumPages, StreamList) -> + {Tails, LC} = gen_server:call(Pid, + {get_tails, NumPages, StreamList, lclock_get()}, + ?LONG_TIME), lclock_update(LC), Tails. @@ -108,13 +106,11 @@ init({FLUs, TypeOrSeed}) -> {ok, {Tab, MLP+1, BadPercent, MaxDifference}} end. -handle_call({get, NumPages, StreamList, LC}, _From, {Tab, MLP}) -> - update_stream_tails(Tab, StreamList, MLP), +handle_call({get, NumPages, LC}, _From, {Tab, MLP}) -> NewLC = lclock_update(LC), {reply, {{ok, MLP}, NewLC}, {Tab, MLP + NumPages}}; -handle_call({get, NumPages, StreamList, LC}, _From, +handle_call({get, NumPages, LC}, _From, {Tab, MLP, BadPercent, MaxDifference}) -> - [ets:insert(Tab, {Stream, MLP}) || Stream <- StreamList], NewLC = lclock_update(LC), Fudge = case random:uniform(100) of N when N < BadPercent -> @@ -124,16 +120,23 @@ handle_call({get, NumPages, StreamList, LC}, _From, end, {reply, {{ok, erlang:max(1, MLP + Fudge)}, NewLC}, {Tab, MLP + NumPages, BadPercent, MaxDifference}}; -handle_call({get_tails, StreamList, LC}, _From, MLP_tuple) -> +handle_call({get_tails, NumPages, StreamList, LC}, _From, MLP_tuple) -> Tab = element(1, MLP_tuple), + MLP = element(2, MLP_tuple), + if NumPages > 0 -> + update_stream_tails(Tab, StreamList, MLP); + true -> + ok + end, Tails = [case (catch ets:lookup_element(Tab, Stream, 2)) of {'EXIT', _} -> - 1; + []; Res -> Res end || Stream <- StreamList], NewLC = lclock_update(LC), - {reply, {{ok, Tails}, NewLC}, MLP_tuple}; + {reply, {{ok, MLP, Tails}, NewLC}, + setelement(2, MLP_tuple, MLP + NumPages)}; handle_call({set_tails, StreamTails}, _From, MLP_tuple) -> Tab = element(1, MLP_tuple), true = ets:delete_all_objects(Tab), diff --git a/prototype/tango-prototype/src/tango.erl b/prototype/tango-prototype/src/tango.erl index e5c272d..4ba0126 100644 --- a/prototype/tango-prototype/src/tango.erl +++ b/prototype/tango-prototype/src/tango.erl @@ -18,16 +18,19 @@ %% %% ------------------------------------------------------------------- -%% A prototype implementation of Tango over CORFU. +%% A prototype implementation of Tango over Corfurl. -module(tango). --export([pack_v1/3, unpack_v1/2, +-include("corfurl.hrl"). + +-export([pack_v1/4, unpack_v1/2, add_back_pointer/2, add_back_pointer/3, scan_backward/4, scan_backward/5, - pad_bin/2]). + pad_bin/2, + append_page/3]). -define(MAGIC_NUMBER_V1, 16#88990011). @@ -35,18 +38,23 @@ %% TODO: for version 2: add strong checksum -pack_v1(StreamList, Page, PageSize) when is_list(StreamList), is_binary(Page) -> +pack_v1(StreamList, Options, Page, PageSize) + when is_list(StreamList), is_list(Options), is_binary(Page), + is_integer(PageSize), PageSize > 0 -> StreamListBin = term_to_binary(StreamList), StreamListSize = byte_size(StreamListBin), + OptionsInt = convert_options_list2int(Options), PageActualSize = byte_size(Page), pad_bin(PageSize, list_to_binary([<>, + <>, <>, StreamListBin, <>, Page])). unpack_v1(<>, Part) -> @@ -78,6 +86,11 @@ add_back_pointer([], New) -> add_back_pointer(BackPs, New) -> [New|BackPs]. +convert_options_list2int(Options) -> + lists:foldl(fun(to_final_page, Int) -> Int + 1; + (_, Int) -> Int + end, 0, Options). + scan_backward(Proj, Stream, LastLPN, WithPagesP) -> scan_backward(Proj, Stream, LastLPN, 0, WithPagesP). @@ -137,3 +150,54 @@ scan_backward2(Proj, Stream, LastLPN, StopAtLPN, NumPages, WithPagesP) -> Err end. +%% Hrm, this looks pretty similar to corfurl_client:append_page. + +append_page(Proj, Page, StreamList) -> + append_page(Proj, Page, StreamList, 5). + +append_page(Proj, _Page, _StreamList, 0) -> + {{error_failed, ?MODULE, ?LINE}, Proj}; +append_page(#proj{seq={Sequencer,_,_}} = Proj, Page, StreamList, Retries) -> + try + {ok, LPN} = corfurl_sequencer:get(Sequencer, 1, StreamList), + %% pulse_tracing_add(write, LPN), + append_page1(Proj, LPN, Page, StreamList, 5) + catch + exit:{Reason,{_gen_server_or_pulse_gen_server,call,[Sequencer|_]}} + when Reason == noproc; Reason == normal -> + NewSeq = corfurl_client:restart_sequencer(Proj), + append_page(Proj#proj{seq=NewSeq}, Page, StreamList, Retries); + exit:Exit -> + {{error_failed, ?MODULE, ?LINE}, incomplete_code, Exit} + end. + +append_page1(Proj, _LPN, _Page, _StreamList, 0) -> + {{error_failed, ?MODULE, ?LINE}, Proj}; +append_page1(Proj, LPN, Page, StreamList, Retries) -> + case append_page2(Proj, LPN, Page) of + lost_race -> + append_page(Proj, Page, StreamList, Retries - 1); + error_badepoch -> + case corfurl_sequencer:poll_for_new_epoch_projection(Proj) of + {ok, NewProj} -> + append_page1(NewProj, LPN, Page, StreamList, Retries - 1); + Else -> + {Else, Proj} + end; + Else -> + {Else, Proj} + end. + +append_page2(Proj, LPN, Page) -> + case corfurl:write_page(Proj, LPN, Page) of + ok -> + {ok, LPN}; + X when X == error_overwritten; X == error_trimmed -> + %% report_lost_race(LPN, X), + lost_race; + {special_trimmed, LPN}=XX -> + XX; + error_badepoch=XX-> + XX + %% Let it crash: error_unwritten + end. diff --git a/prototype/tango-prototype/src/tango_dt.erl b/prototype/tango-prototype/src/tango_dt.erl index ba4296a..066066c 100644 --- a/prototype/tango-prototype/src/tango_dt.erl +++ b/prototype/tango-prototype/src/tango_dt.erl @@ -36,7 +36,7 @@ -type lpn() :: non_neg_integer(). -record(state, { - page_size :: non_neg_integer(), % CORFU page size + page_size :: non_neg_integer(), % Corfurl page size seq :: pid(), % sequencer pid proj :: term(), % projection stream_num :: non_neg_integer(), % this instance's OID number diff --git a/prototype/tango-prototype/src/tango_oid.erl b/prototype/tango-prototype/src/tango_oid.erl index cc83b5a..36d8b01 100644 --- a/prototype/tango-prototype/src/tango_oid.erl +++ b/prototype/tango-prototype/src/tango_oid.erl @@ -20,192 +20,193 @@ -module(tango_oid). --behaviour(gen_server). +%% -behaviour(gen_server). -%% API --export([start_link/3, stop/1, - new/2, get/2]). +%% %% API +%% -export([start_link/3, stop/1, +%% new/2, get/2]). -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). +%% %% gen_server callbacks +%% -export([init/1, handle_call/3, handle_cast/2, handle_info/2, +%% terminate/2, code_change/3]). -%% Tango datatype callbacks (prototype) --export([fresh/0, - do_pure_op/2, do_dirty_op/6, play_log_mutate_i_state/3]). +%% %% Tango datatype callbacks (prototype) +%% -export([fresh/0, +%% do_pure_op/2, do_dirty_op/5, play_log_mutate_i_state/3]). --define(SERVER, ?MODULE). --define(OID_STREAM_NUMBER, 0). +%% -define(SERVER, ?MODULE). +%% -define(OID_STREAM_NUMBER, 0). --define(LONG_TIME, 30*1000). +%% -define(LONG_TIME, 30*1000). --define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])). +%% -define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])). --type lpn() :: non_neg_integer(). +%% -type lpn() :: non_neg_integer(). --record(state, { - page_size :: non_neg_integer(), % CORFU page size - seq :: pid(), % sequencer pid - proj :: term(), % projection - last_read_lpn :: lpn(), % - last_write_lpn :: lpn(), - back_ps :: [lpn()], % back pointers (up to 4) - i_state :: term() % internal state thingie - }). +%% -record(state, { +%% page_size :: non_neg_integer(), % Corfurl page size +%% seq :: pid(), % sequencer pid +%% proj :: term(), % projection +%% last_fetch_lpn :: lpn(), % +%% all_back_ps :: [lpn()], % All back-pointers LIFO order! +%% i_state :: term() % internal state thingie +%% }). -start_link(PageSize, SequencerPid, Proj) -> - gen_server:start_link(?MODULE, - [PageSize, SequencerPid, Proj], []). +%% start_link(PageSize, SequencerPid, Proj) -> +%% gen_server:start_link(?MODULE, +%% [PageSize, SequencerPid, Proj], []). -stop(Pid) -> - gen_server:call(Pid, {stop}, ?LONG_TIME). +%% stop(Pid) -> +%% gen_server:call(Pid, {stop}, ?LONG_TIME). -new(Pid, Key) -> - gen_server:call(Pid, {new, Key}, ?LONG_TIME). +%% new(Pid, Key) -> +%% gen_server:call(Pid, {new, Key}, ?LONG_TIME). -get(Pid, Key) -> - gen_server:call(Pid, {get, Key}, ?LONG_TIME). +%% get(Pid, Key) -> +%% gen_server:call(Pid, {get, Key}, ?LONG_TIME). -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -init([PageSize, SequencerPid, Proj]) -> - LastLPN = find_last_lpn(SequencerPid), - {BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, 0), - I_State = play_log_pages(Pages, fresh(), ?MODULE, false), - {ok, #state{page_size=PageSize, - seq=SequencerPid, - proj=Proj, - last_read_lpn=LastLPN, - last_write_lpn=LastLPN, - back_ps=BackPs, - i_state=I_State}}. +%% init([PageSize, SequencerPid, Proj]) -> +%% LastLPN = find_last_lpn(SequencerPid), +%% {LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, 0), +%% BackPs = lists:reverse(LPNs), +%% LastFetchLPN = case LPNs of [] -> 0; +%% [H|_] -> H +%% end, +%% I_State = play_log_pages(Pages, fresh(), ?MODULE, false), +%% {ok, #state{page_size=PageSize, +%% seq=SequencerPid, +%% proj=Proj, +%% last_fetch_lpn=LastFetchLPN, +%% all_back_ps=BackPs, +%% i_state=I_State}}. -handle_call({new, Key}, From, - #state{proj=Proj0, - page_size=PageSize, back_ps=BackPs, i_state=I_State}=State) -> - Op = {new_oid, Key, From}, - {_Res, I_State2, Proj1, LPN, NewBackPs} = - do_dirty_op(Op, I_State, ?OID_STREAM_NUMBER, - Proj0, PageSize, BackPs), - %% Let's see how much trouble we can get outselves in here. - %% If we're here, then we've written to the log without error. - %% So then the cast to roll forward must see that log entry - %% (if it also operates without error). So, the side-effect of - %% the op ought to always send a reply to the client. - gen_server:cast(self(), {roll_forward}), - {noreply, State#state{i_state=I_State2, - proj=Proj1, - last_write_lpn=LPN, - back_ps=NewBackPs}}; -handle_call({get, _Key}=Op, _From, State) -> - State2 = #state{i_state=I_State} = roll_log_forward(State), - Reply = do_pure_op(Op, I_State), - {reply, Reply, State2}; -handle_call({stop}, _From, State) -> - {stop, normal, ok, State}; -handle_call(_Request, _From, State) -> - Reply = whaaaaaaaaaaaa, - {reply, Reply, State}. +%% handle_call({new, Key}, From, +%% #state{proj=Proj0, page_size=PageSize, i_state=I_State}=State) -> +%% Op = {new_oid, Key, From, 0}, +%% {_Res, I_State2, Proj1, LPN} = +%% do_dirty_op(Op, I_State, ?OID_STREAM_NUMBER, Proj0, PageSize), +%% %% Let's see how much trouble we can get outselves in here. +%% %% If we're here, then we've written to the log without error. +%% %% So then the cast to roll forward must see that log entry +%% %% (if it also operates without error). So, the side-effect of +%% %% the op ought to always send a reply to the client. +%% gen_server:cast(self(), {roll_forward}), +%% {noreply, State#state{i_state=I_State2, +%% proj=Proj1}}; +%% handle_call({get, _Key}=Op, _From, State) -> +%% State2 = #state{i_state=I_State} = roll_log_forward(State), +%% Reply = do_pure_op(Op, I_State), +%% {reply, Reply, State2}; +%% handle_call({stop}, _From, State) -> +%% {stop, normal, ok, State}; +%% handle_call(_Request, _From, State) -> +%% Reply = whaaaaaaaaaaaa, +%% {reply, Reply, State}. -handle_cast({roll_forward}, State) -> - State2 = roll_log_forward(State), - {noreply, State2}; -handle_cast(_Msg, State) -> - {noreply, State}. +%% handle_cast({roll_forward}, State) -> +%% State2 = roll_log_forward(State), +%% {noreply, State2}; +%% handle_cast(_Msg, State) -> +%% {noreply, State}. -handle_info(_Info, State) -> - {noreply, State}. +%% handle_info(_Info, State) -> +%% {noreply, State}. -terminate(_Reason, _State) -> - ok. +%% terminate(_Reason, _State) -> +%% ok. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. +%% code_change(_OldVsn, State, _Extra) -> +%% {ok, State}. -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -find_last_lpn(#state{seq=SequencerPid}) -> - find_last_lpn(SequencerPid); -find_last_lpn(SequencerPid) -> - {ok, CurrentLPN} = corfurl_sequencer:get(SequencerPid, 0), - CurrentLPN - 1. +%% find_last_lpn(#state{seq=SequencerPid}) -> +%% find_last_lpn(SequencerPid); +%% find_last_lpn(SequencerPid) -> +%% {ok, CurrentLPN} = corfurl_sequencer:get(SequencerPid, 0), +%% CurrentLPN - 1. -fetch_unread_pages(#state{seq=___FIXME_SequencerPid, proj=Proj, - last_read_lpn=StopAtLPN, - last_write_lpn=LastLPN} = State) -> - %% TODO: fixme: to handle concurrent updates correctly, we should - %% query the sequencer for the last LPN. - {BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN), - %% TODO ???? - %% LastReadLPN = if BackPs == [] -> 0; - %% true -> hd(BackPs) - %% end, - {Pages, State#state{last_read_lpn=LastLPN, back_ps=BackPs}}. +%% %% AAA refactor: return value changes here, propagate +%% fetch_unread_pages(#state{proj=Proj, last_fetch_lpn=StopAtLPN, +%% all_back_ps=BPs} = State) -> +%% LastLPN = find_last_lpn(State), +%% {LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN), +%% NewBPs = append_lpns(LPNs, BPs), +%% {LPNS, Pages, State#state{last_fetch_lpn=LastLPN, back_ps=BackPs}}. -fetch_unread_pages(Proj, LastLPN, StopAtLPN) -> - %% ?D({fetch_unread_pages, LastLPN, StopAtLPN}), - LPNandPages = tango:scan_backward(Proj, ?OID_STREAM_NUMBER, LastLPN, - StopAtLPN, true), - {LPNs, Pages} = lists:unzip(LPNandPages), - BackPs = lists:foldl(fun(P, BPs) -> tango:add_back_pointer(BPs, P) end, - [], LPNs), - {BackPs, Pages}. +%% fetch_unread_pages(Proj, LastLPN, StopAtLPN) +%% when LastLPN >= StopAtLPN -> +%% %% ?D({fetch_unread_pages, LastLPN, StopAtLPN}), +%% LPNandPages = tango:scan_backward(Proj, ?OID_STREAM_NUMBER, LastLPN, +%% StopAtLPN, true), +%% {_LPNs, _Pages} = lists:unzip(LPNandPages). -play_log_pages(Pages, SideEffectsP, - #state{i_state=I_State} = State) -> - I_State2 = play_log_pages(Pages, I_State, ?MODULE, SideEffectsP), - State#state{i_state=I_State2}. +%% play_log_pages(Pages, SideEffectsP, +%% #state{i_state=I_State} = State) -> +%% I_State2 = play_log_pages(Pages, I_State, ?MODULE, SideEffectsP), +%% State#state{i_state=I_State2}. -play_log_pages(Pages, I_State, CallbackMod, SideEffectsP) -> - CallbackMod:play_log_mutate_i_state(Pages, SideEffectsP, I_State). +%% play_log_pages(Pages, I_State, CallbackMod, SideEffectsP) -> +%% CallbackMod:play_log_mutate_i_state(Pages, SideEffectsP, I_State). -roll_log_forward(State) -> - {Pages, State2} = fetch_unread_pages(State), - play_log_pages(Pages, true, State2). +%% roll_log_forward(#state{proj=Proj, all_back_ps=BackPs, +%% last_fetch_lpn=StopAtLPN} = State) -> +%% LastLPN = find_last_lpn(SequencerPid), +%% {LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN), +%% NewBPs = append_lpns(LPNs, BackPs), +%% play_log_pages(Pages, true, State2#state{all_back_ps=NewBPs}). -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% append_lpns([], BPs) -> +%% BPs; +%% append_lpns(LPNs, BPs) -> +%% lists:reverse(LPNs) ++ BPs. --record(oid_map, { - next :: non_neg_integer(), - map :: dict() - }). +%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --define(DICTMOD, dict). +%% -record(oid_map, { +%% next :: non_neg_integer(), +%% map :: dict() +%% }). -fresh() -> - #oid_map{next=1, - map=?DICTMOD:new()}. +%% -define(DICTMOD, dict). -do_pure_op({get, Key}, #oid_map{map=Dict}) -> - ?DICTMOD:find(Key, Dict). +%% fresh() -> +%% #oid_map{next=1, +%% map=?DICTMOD:new()}. -do_dirty_op({new_oid, _Key, _From}=Op, - I_State, StreamNum, Proj0, PageSize, BackPs) -> - Page = term_to_binary(Op), - FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize), - {{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage, - [StreamNum]), - NewBackPs = tango:add_back_pointer(BackPs, LPN), - {ok, I_State, Proj1, LPN, NewBackPs}. +%% do_pure_op({get, Key}, #oid_map{map=Dict}) -> +%% ?DICTMOD:find(Key, Dict). -play_log_mutate_i_state(Pages, SideEffectsP, I_State) -> - lists:foldl(fun({new_oid, Key, From}=_Op, #oid_map{map=Dict, next=Next}=O) -> - {Res, O2} = - case ?DICTMOD:find(Key, Dict) of - error -> - Dict2 = ?DICTMOD:store(Key, Next, Dict), - {{ok, Next},O#oid_map{map=Dict2, - next=Next + 1}}; - {ok, _} -> - {already_exists, O} - end, - if SideEffectsP -> - gen_server:reply(From, Res); - true -> - ok - end, - O2 - end, - I_State, - [binary_to_term(Page) || Page <- Pages]). +%% do_dirty_op({new_oid, _Key, _From, _NumOfAttempts}=Op, +%% I_State, StreamNum, Proj0, PageSize) -> +%% Page = term_to_binary(Op), +%% FullPage = tango:pack_v1([StreamNum], [to_final_page], +%% Page, PageSize), +%% left off here, +%% {{ok, LPN}, Proj1} = tango:append_page(Proj0, Page, [StreamNum]), +%% {ok, I_State, Proj1, LPN}. + +%% play_log_mutate_i_state(Pages, SideEffectsP, I_State) -> +%% lists:foldl(fun({new_oid, Key, From, _NumOfAttempts}=_Op, +%% #oid_map{map=Dict, next=Next}=O) -> +%% {Res, O2} = +%% case ?DICTMOD:find(Key, Dict) of +%% error -> +%% Dict2 = ?DICTMOD:store(Key, Next, Dict), +%% {{ok, Next},O#oid_map{map=Dict2, +%% next=Next + 1}}; +%% {ok, _} -> +%% {already_exists, O} +%% end, +%% if SideEffectsP -> +%% gen_server:reply(From, Res); +%% true -> +%% ok +%% end, +%% O2 +%% end, +%% I_State, +%% [binary_to_term(Page) || Page <- Pages]). diff --git a/prototype/tango-prototype/test/corfurl_sequencer_test.erl b/prototype/tango-prototype/test/corfurl_sequencer_test.erl index 789a6d5..3396a17 100644 --- a/prototype/tango-prototype/test/corfurl_sequencer_test.erl +++ b/prototype/tango-prototype/test/corfurl_sequencer_test.erl @@ -57,28 +57,35 @@ smoke_test() -> MLP0 = NumFLUs, NumFLUs = ?M:get_max_logical_page(FLUs), - %% Excellent. Now let's start the sequencer and see if it gets - %% the same answer. If yes, then the first get will return MLP1, - %% yadda yadda. - MLP1 = MLP0 + 1, - MLP3 = MLP0 + 3, - MLP4 = MLP0 + 4, {ok, Sequencer} = ?M:start_link(FLUs), try + {ok, _} = ?M:get(Sequencer, 5000), [{Stream9, Tails9}] = StreamTails = [{9, [1125, 1124, 1123]}], ok = ?M:set_tails(Sequencer, StreamTails), - {ok, [Tails9]} = ?M:get_tails(Sequencer, [Stream9]), + {ok, _, [Tails9]} = ?M:get_tails(Sequencer, 0, [Stream9]), - {ok, LPN1} = ?M:get(Sequencer, 2), - {ok, LPN3} = ?M:get(Sequencer, 1, [2]), - {ok, LPN4} = ?M:get(Sequencer, 1, [1]), - {ok, LPN5} = ?M:get(Sequencer, 1, [2]), - {ok, LPN6} = ?M:get(Sequencer, 1, [2]), - {ok, LPN7} = ?M:get(Sequencer, 1, [2]), - {ok, LPN8} = ?M:get(Sequencer, 1, [2]), + {ok, LPN0a} = ?M:get(Sequencer, 2), + {ok, LPN0b} = ?M:get(Sequencer, 0), + LPN0a = LPN0b - 2, + + {ok, LPN2a, _} = ?M:get_tails(Sequencer, 1, [2]), + {ok, LPN1a, _} = ?M:get_tails(Sequencer, 1, [1]), + {ok, _, [[LPN1a], [LPN2a]]} = ?M:get_tails(Sequencer, + 0, [1,2]), + {ok, LPN2b, _} = ?M:get_tails(Sequencer, 1, [2]), + {ok, LPN2c, _} = ?M:get_tails(Sequencer, 1, [2]), + {ok, _, [[LPN1a], [LPN2c, LPN2b, LPN2a]]} = + ?M:get_tails(Sequencer, 0, [1,2]), + {ok, LPN2d, _} = ?M:get_tails(Sequencer, 1, [2]), + {ok, LPN2e, _} = ?M:get_tails(Sequencer, 1, [2]), - {ok, [[LPN4], [LPN8, LPN7, LPN6, LPN5]]} = ?M:get_tails(Sequencer, - [1,2]) + {ok, LPNX, [[LP1a], [LPN2e, LPN2d, LPN2c, LPN2b]]} = + ?M:get_tails(Sequencer, 0, [1,2]), + {ok, LPNX, [[LP1a], [LPN2e, LPN2d, LPN2c, LPN2b]]} = + ?M:get_tails(Sequencer, 0, [1,2]), % same results + LPNX = LPN2e + 1, % no change with 0 request + + ok after ?M:stop(Sequencer) end