Gadz, more sequencer cleanup. corfurl_sequencer_test now passes
This commit is contained in:
parent
b8c051c89f
commit
30fc62ab22
5 changed files with 266 additions and 191 deletions
|
@ -24,7 +24,7 @@
|
||||||
|
|
||||||
-export([start_link/1, start_link/2, start_link/3,
|
-export([start_link/1, start_link/2, start_link/3,
|
||||||
stop/1, stop/2,
|
stop/1, stop/2,
|
||||||
get/2, get/3, get_tails/2]).
|
get/2, get_tails/3]).
|
||||||
-export([set_tails/2]).
|
-export([set_tails/2]).
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
@ -77,17 +77,15 @@ stop(Pid, Method) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get(Pid, NumPages) ->
|
get(Pid, NumPages) ->
|
||||||
get(Pid, NumPages, []).
|
{LPN, LC} = gen_server:call(Pid, {get, NumPages, lclock_get()},
|
||||||
|
|
||||||
get(Pid, NumPages, StreamList) ->
|
|
||||||
{LPN, LC} = gen_server:call(Pid, {get, NumPages, StreamList, lclock_get()},
|
|
||||||
?LONG_TIME),
|
?LONG_TIME),
|
||||||
lclock_update(LC),
|
lclock_update(LC),
|
||||||
LPN.
|
LPN.
|
||||||
|
|
||||||
get_tails(Pid, StreamList) ->
|
get_tails(Pid, NumPages, StreamList) ->
|
||||||
{Tails, LC} = gen_server:call(Pid, {get_tails, StreamList, lclock_get()},
|
{Tails, LC} = gen_server:call(Pid,
|
||||||
?LONG_TIME),
|
{get_tails, NumPages, StreamList, lclock_get()},
|
||||||
|
?LONG_TIME),
|
||||||
lclock_update(LC),
|
lclock_update(LC),
|
||||||
Tails.
|
Tails.
|
||||||
|
|
||||||
|
@ -108,13 +106,11 @@ init({FLUs, TypeOrSeed}) ->
|
||||||
{ok, {Tab, MLP+1, BadPercent, MaxDifference}}
|
{ok, {Tab, MLP+1, BadPercent, MaxDifference}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
handle_call({get, NumPages, StreamList, LC}, _From, {Tab, MLP}) ->
|
handle_call({get, NumPages, LC}, _From, {Tab, MLP}) ->
|
||||||
update_stream_tails(Tab, StreamList, MLP),
|
|
||||||
NewLC = lclock_update(LC),
|
NewLC = lclock_update(LC),
|
||||||
{reply, {{ok, MLP}, NewLC}, {Tab, MLP + NumPages}};
|
{reply, {{ok, MLP}, NewLC}, {Tab, MLP + NumPages}};
|
||||||
handle_call({get, NumPages, StreamList, LC}, _From,
|
handle_call({get, NumPages, LC}, _From,
|
||||||
{Tab, MLP, BadPercent, MaxDifference}) ->
|
{Tab, MLP, BadPercent, MaxDifference}) ->
|
||||||
[ets:insert(Tab, {Stream, MLP}) || Stream <- StreamList],
|
|
||||||
NewLC = lclock_update(LC),
|
NewLC = lclock_update(LC),
|
||||||
Fudge = case random:uniform(100) of
|
Fudge = case random:uniform(100) of
|
||||||
N when N < BadPercent ->
|
N when N < BadPercent ->
|
||||||
|
@ -124,16 +120,23 @@ handle_call({get, NumPages, StreamList, LC}, _From,
|
||||||
end,
|
end,
|
||||||
{reply, {{ok, erlang:max(1, MLP + Fudge)}, NewLC},
|
{reply, {{ok, erlang:max(1, MLP + Fudge)}, NewLC},
|
||||||
{Tab, MLP + NumPages, BadPercent, MaxDifference}};
|
{Tab, MLP + NumPages, BadPercent, MaxDifference}};
|
||||||
handle_call({get_tails, StreamList, LC}, _From, MLP_tuple) ->
|
handle_call({get_tails, NumPages, StreamList, LC}, _From, MLP_tuple) ->
|
||||||
Tab = element(1, MLP_tuple),
|
Tab = element(1, MLP_tuple),
|
||||||
|
MLP = element(2, MLP_tuple),
|
||||||
|
if NumPages > 0 ->
|
||||||
|
update_stream_tails(Tab, StreamList, MLP);
|
||||||
|
true ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
Tails = [case (catch ets:lookup_element(Tab, Stream, 2)) of
|
Tails = [case (catch ets:lookup_element(Tab, Stream, 2)) of
|
||||||
{'EXIT', _} ->
|
{'EXIT', _} ->
|
||||||
1;
|
[];
|
||||||
Res ->
|
Res ->
|
||||||
Res
|
Res
|
||||||
end || Stream <- StreamList],
|
end || Stream <- StreamList],
|
||||||
NewLC = lclock_update(LC),
|
NewLC = lclock_update(LC),
|
||||||
{reply, {{ok, Tails}, NewLC}, MLP_tuple};
|
{reply, {{ok, MLP, Tails}, NewLC},
|
||||||
|
setelement(2, MLP_tuple, MLP + NumPages)};
|
||||||
handle_call({set_tails, StreamTails}, _From, MLP_tuple) ->
|
handle_call({set_tails, StreamTails}, _From, MLP_tuple) ->
|
||||||
Tab = element(1, MLP_tuple),
|
Tab = element(1, MLP_tuple),
|
||||||
true = ets:delete_all_objects(Tab),
|
true = ets:delete_all_objects(Tab),
|
||||||
|
|
|
@ -18,16 +18,19 @@
|
||||||
%%
|
%%
|
||||||
%% -------------------------------------------------------------------
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
%% A prototype implementation of Tango over CORFU.
|
%% A prototype implementation of Tango over Corfurl.
|
||||||
|
|
||||||
-module(tango).
|
-module(tango).
|
||||||
|
|
||||||
-export([pack_v1/3, unpack_v1/2,
|
-include("corfurl.hrl").
|
||||||
|
|
||||||
|
-export([pack_v1/4, unpack_v1/2,
|
||||||
add_back_pointer/2,
|
add_back_pointer/2,
|
||||||
add_back_pointer/3,
|
add_back_pointer/3,
|
||||||
scan_backward/4,
|
scan_backward/4,
|
||||||
scan_backward/5,
|
scan_backward/5,
|
||||||
pad_bin/2]).
|
pad_bin/2,
|
||||||
|
append_page/3]).
|
||||||
|
|
||||||
-define(MAGIC_NUMBER_V1, 16#88990011).
|
-define(MAGIC_NUMBER_V1, 16#88990011).
|
||||||
|
|
||||||
|
@ -35,18 +38,23 @@
|
||||||
|
|
||||||
%% TODO: for version 2: add strong checksum
|
%% TODO: for version 2: add strong checksum
|
||||||
|
|
||||||
pack_v1(StreamList, Page, PageSize) when is_list(StreamList), is_binary(Page) ->
|
pack_v1(StreamList, Options, Page, PageSize)
|
||||||
|
when is_list(StreamList), is_list(Options), is_binary(Page),
|
||||||
|
is_integer(PageSize), PageSize > 0 ->
|
||||||
StreamListBin = term_to_binary(StreamList),
|
StreamListBin = term_to_binary(StreamList),
|
||||||
StreamListSize = byte_size(StreamListBin),
|
StreamListSize = byte_size(StreamListBin),
|
||||||
|
OptionsInt = convert_options_list2int(Options),
|
||||||
PageActualSize = byte_size(Page),
|
PageActualSize = byte_size(Page),
|
||||||
pad_bin(PageSize,
|
pad_bin(PageSize,
|
||||||
list_to_binary([<<?MAGIC_NUMBER_V1:32/big>>,
|
list_to_binary([<<?MAGIC_NUMBER_V1:32/big>>,
|
||||||
|
<<OptionsInt:8/big>>,
|
||||||
<<StreamListSize:16/big>>,
|
<<StreamListSize:16/big>>,
|
||||||
StreamListBin,
|
StreamListBin,
|
||||||
<<PageActualSize:16/big>>,
|
<<PageActualSize:16/big>>,
|
||||||
Page])).
|
Page])).
|
||||||
|
|
||||||
unpack_v1(<<?MAGIC_NUMBER_V1:32/big,
|
unpack_v1(<<?MAGIC_NUMBER_V1:32/big,
|
||||||
|
_Options:8/big,
|
||||||
StreamListSize:16/big, StreamListBin:StreamListSize/binary,
|
StreamListSize:16/big, StreamListBin:StreamListSize/binary,
|
||||||
PageActualSize:16/big, Page:PageActualSize/binary,
|
PageActualSize:16/big, Page:PageActualSize/binary,
|
||||||
_/binary>>, Part) ->
|
_/binary>>, Part) ->
|
||||||
|
@ -78,6 +86,11 @@ add_back_pointer([], New) ->
|
||||||
add_back_pointer(BackPs, New) ->
|
add_back_pointer(BackPs, New) ->
|
||||||
[New|BackPs].
|
[New|BackPs].
|
||||||
|
|
||||||
|
convert_options_list2int(Options) ->
|
||||||
|
lists:foldl(fun(to_final_page, Int) -> Int + 1;
|
||||||
|
(_, Int) -> Int
|
||||||
|
end, 0, Options).
|
||||||
|
|
||||||
scan_backward(Proj, Stream, LastLPN, WithPagesP) ->
|
scan_backward(Proj, Stream, LastLPN, WithPagesP) ->
|
||||||
scan_backward(Proj, Stream, LastLPN, 0, WithPagesP).
|
scan_backward(Proj, Stream, LastLPN, 0, WithPagesP).
|
||||||
|
|
||||||
|
@ -137,3 +150,54 @@ scan_backward2(Proj, Stream, LastLPN, StopAtLPN, NumPages, WithPagesP) ->
|
||||||
Err
|
Err
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% Hrm, this looks pretty similar to corfurl_client:append_page.
|
||||||
|
|
||||||
|
append_page(Proj, Page, StreamList) ->
|
||||||
|
append_page(Proj, Page, StreamList, 5).
|
||||||
|
|
||||||
|
append_page(Proj, _Page, _StreamList, 0) ->
|
||||||
|
{{error_failed, ?MODULE, ?LINE}, Proj};
|
||||||
|
append_page(#proj{seq={Sequencer,_,_}} = Proj, Page, StreamList, Retries) ->
|
||||||
|
try
|
||||||
|
{ok, LPN} = corfurl_sequencer:get(Sequencer, 1, StreamList),
|
||||||
|
%% pulse_tracing_add(write, LPN),
|
||||||
|
append_page1(Proj, LPN, Page, StreamList, 5)
|
||||||
|
catch
|
||||||
|
exit:{Reason,{_gen_server_or_pulse_gen_server,call,[Sequencer|_]}}
|
||||||
|
when Reason == noproc; Reason == normal ->
|
||||||
|
NewSeq = corfurl_client:restart_sequencer(Proj),
|
||||||
|
append_page(Proj#proj{seq=NewSeq}, Page, StreamList, Retries);
|
||||||
|
exit:Exit ->
|
||||||
|
{{error_failed, ?MODULE, ?LINE}, incomplete_code, Exit}
|
||||||
|
end.
|
||||||
|
|
||||||
|
append_page1(Proj, _LPN, _Page, _StreamList, 0) ->
|
||||||
|
{{error_failed, ?MODULE, ?LINE}, Proj};
|
||||||
|
append_page1(Proj, LPN, Page, StreamList, Retries) ->
|
||||||
|
case append_page2(Proj, LPN, Page) of
|
||||||
|
lost_race ->
|
||||||
|
append_page(Proj, Page, StreamList, Retries - 1);
|
||||||
|
error_badepoch ->
|
||||||
|
case corfurl_sequencer:poll_for_new_epoch_projection(Proj) of
|
||||||
|
{ok, NewProj} ->
|
||||||
|
append_page1(NewProj, LPN, Page, StreamList, Retries - 1);
|
||||||
|
Else ->
|
||||||
|
{Else, Proj}
|
||||||
|
end;
|
||||||
|
Else ->
|
||||||
|
{Else, Proj}
|
||||||
|
end.
|
||||||
|
|
||||||
|
append_page2(Proj, LPN, Page) ->
|
||||||
|
case corfurl:write_page(Proj, LPN, Page) of
|
||||||
|
ok ->
|
||||||
|
{ok, LPN};
|
||||||
|
X when X == error_overwritten; X == error_trimmed ->
|
||||||
|
%% report_lost_race(LPN, X),
|
||||||
|
lost_race;
|
||||||
|
{special_trimmed, LPN}=XX ->
|
||||||
|
XX;
|
||||||
|
error_badepoch=XX->
|
||||||
|
XX
|
||||||
|
%% Let it crash: error_unwritten
|
||||||
|
end.
|
||||||
|
|
|
@ -36,7 +36,7 @@
|
||||||
-type lpn() :: non_neg_integer().
|
-type lpn() :: non_neg_integer().
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
page_size :: non_neg_integer(), % CORFU page size
|
page_size :: non_neg_integer(), % Corfurl page size
|
||||||
seq :: pid(), % sequencer pid
|
seq :: pid(), % sequencer pid
|
||||||
proj :: term(), % projection
|
proj :: term(), % projection
|
||||||
stream_num :: non_neg_integer(), % this instance's OID number
|
stream_num :: non_neg_integer(), % this instance's OID number
|
||||||
|
|
|
@ -20,192 +20,193 @@
|
||||||
|
|
||||||
-module(tango_oid).
|
-module(tango_oid).
|
||||||
|
|
||||||
-behaviour(gen_server).
|
%% -behaviour(gen_server).
|
||||||
|
|
||||||
%% API
|
%% %% API
|
||||||
-export([start_link/3, stop/1,
|
%% -export([start_link/3, stop/1,
|
||||||
new/2, get/2]).
|
%% new/2, get/2]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% %% gen_server callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
%% -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
%% terminate/2, code_change/3]).
|
||||||
|
|
||||||
%% Tango datatype callbacks (prototype)
|
%% %% Tango datatype callbacks (prototype)
|
||||||
-export([fresh/0,
|
%% -export([fresh/0,
|
||||||
do_pure_op/2, do_dirty_op/6, play_log_mutate_i_state/3]).
|
%% do_pure_op/2, do_dirty_op/5, play_log_mutate_i_state/3]).
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
%% -define(SERVER, ?MODULE).
|
||||||
-define(OID_STREAM_NUMBER, 0).
|
%% -define(OID_STREAM_NUMBER, 0).
|
||||||
|
|
||||||
-define(LONG_TIME, 30*1000).
|
%% -define(LONG_TIME, 30*1000).
|
||||||
|
|
||||||
-define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])).
|
%% -define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])).
|
||||||
|
|
||||||
-type lpn() :: non_neg_integer().
|
%% -type lpn() :: non_neg_integer().
|
||||||
|
|
||||||
-record(state, {
|
%% -record(state, {
|
||||||
page_size :: non_neg_integer(), % CORFU page size
|
%% page_size :: non_neg_integer(), % Corfurl page size
|
||||||
seq :: pid(), % sequencer pid
|
%% seq :: pid(), % sequencer pid
|
||||||
proj :: term(), % projection
|
%% proj :: term(), % projection
|
||||||
last_read_lpn :: lpn(), %
|
%% last_fetch_lpn :: lpn(), %
|
||||||
last_write_lpn :: lpn(),
|
%% all_back_ps :: [lpn()], % All back-pointers LIFO order!
|
||||||
back_ps :: [lpn()], % back pointers (up to 4)
|
%% i_state :: term() % internal state thingie
|
||||||
i_state :: term() % internal state thingie
|
%% }).
|
||||||
}).
|
|
||||||
|
|
||||||
start_link(PageSize, SequencerPid, Proj) ->
|
%% start_link(PageSize, SequencerPid, Proj) ->
|
||||||
gen_server:start_link(?MODULE,
|
%% gen_server:start_link(?MODULE,
|
||||||
[PageSize, SequencerPid, Proj], []).
|
%% [PageSize, SequencerPid, Proj], []).
|
||||||
|
|
||||||
stop(Pid) ->
|
%% stop(Pid) ->
|
||||||
gen_server:call(Pid, {stop}, ?LONG_TIME).
|
%% gen_server:call(Pid, {stop}, ?LONG_TIME).
|
||||||
|
|
||||||
new(Pid, Key) ->
|
%% new(Pid, Key) ->
|
||||||
gen_server:call(Pid, {new, Key}, ?LONG_TIME).
|
%% gen_server:call(Pid, {new, Key}, ?LONG_TIME).
|
||||||
|
|
||||||
get(Pid, Key) ->
|
%% get(Pid, Key) ->
|
||||||
gen_server:call(Pid, {get, Key}, ?LONG_TIME).
|
%% gen_server:call(Pid, {get, Key}, ?LONG_TIME).
|
||||||
|
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
|
||||||
init([PageSize, SequencerPid, Proj]) ->
|
%% init([PageSize, SequencerPid, Proj]) ->
|
||||||
LastLPN = find_last_lpn(SequencerPid),
|
%% LastLPN = find_last_lpn(SequencerPid),
|
||||||
{BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, 0),
|
%% {LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, 0),
|
||||||
I_State = play_log_pages(Pages, fresh(), ?MODULE, false),
|
%% BackPs = lists:reverse(LPNs),
|
||||||
{ok, #state{page_size=PageSize,
|
%% LastFetchLPN = case LPNs of [] -> 0;
|
||||||
seq=SequencerPid,
|
%% [H|_] -> H
|
||||||
proj=Proj,
|
%% end,
|
||||||
last_read_lpn=LastLPN,
|
%% I_State = play_log_pages(Pages, fresh(), ?MODULE, false),
|
||||||
last_write_lpn=LastLPN,
|
%% {ok, #state{page_size=PageSize,
|
||||||
back_ps=BackPs,
|
%% seq=SequencerPid,
|
||||||
i_state=I_State}}.
|
%% proj=Proj,
|
||||||
|
%% last_fetch_lpn=LastFetchLPN,
|
||||||
|
%% all_back_ps=BackPs,
|
||||||
|
%% i_state=I_State}}.
|
||||||
|
|
||||||
handle_call({new, Key}, From,
|
%% handle_call({new, Key}, From,
|
||||||
#state{proj=Proj0,
|
%% #state{proj=Proj0, page_size=PageSize, i_state=I_State}=State) ->
|
||||||
page_size=PageSize, back_ps=BackPs, i_state=I_State}=State) ->
|
%% Op = {new_oid, Key, From, 0},
|
||||||
Op = {new_oid, Key, From},
|
%% {_Res, I_State2, Proj1, LPN} =
|
||||||
{_Res, I_State2, Proj1, LPN, NewBackPs} =
|
%% do_dirty_op(Op, I_State, ?OID_STREAM_NUMBER, Proj0, PageSize),
|
||||||
do_dirty_op(Op, I_State, ?OID_STREAM_NUMBER,
|
%% %% Let's see how much trouble we can get outselves in here.
|
||||||
Proj0, PageSize, BackPs),
|
%% %% If we're here, then we've written to the log without error.
|
||||||
%% Let's see how much trouble we can get outselves in here.
|
%% %% So then the cast to roll forward must see that log entry
|
||||||
%% If we're here, then we've written to the log without error.
|
%% %% (if it also operates without error). So, the side-effect of
|
||||||
%% So then the cast to roll forward must see that log entry
|
%% %% the op ought to always send a reply to the client.
|
||||||
%% (if it also operates without error). So, the side-effect of
|
%% gen_server:cast(self(), {roll_forward}),
|
||||||
%% the op ought to always send a reply to the client.
|
%% {noreply, State#state{i_state=I_State2,
|
||||||
gen_server:cast(self(), {roll_forward}),
|
%% proj=Proj1}};
|
||||||
{noreply, State#state{i_state=I_State2,
|
%% handle_call({get, _Key}=Op, _From, State) ->
|
||||||
proj=Proj1,
|
%% State2 = #state{i_state=I_State} = roll_log_forward(State),
|
||||||
last_write_lpn=LPN,
|
%% Reply = do_pure_op(Op, I_State),
|
||||||
back_ps=NewBackPs}};
|
%% {reply, Reply, State2};
|
||||||
handle_call({get, _Key}=Op, _From, State) ->
|
%% handle_call({stop}, _From, State) ->
|
||||||
State2 = #state{i_state=I_State} = roll_log_forward(State),
|
%% {stop, normal, ok, State};
|
||||||
Reply = do_pure_op(Op, I_State),
|
%% handle_call(_Request, _From, State) ->
|
||||||
{reply, Reply, State2};
|
%% Reply = whaaaaaaaaaaaa,
|
||||||
handle_call({stop}, _From, State) ->
|
%% {reply, Reply, State}.
|
||||||
{stop, normal, ok, State};
|
|
||||||
handle_call(_Request, _From, State) ->
|
|
||||||
Reply = whaaaaaaaaaaaa,
|
|
||||||
{reply, Reply, State}.
|
|
||||||
|
|
||||||
handle_cast({roll_forward}, State) ->
|
%% handle_cast({roll_forward}, State) ->
|
||||||
State2 = roll_log_forward(State),
|
%% State2 = roll_log_forward(State),
|
||||||
{noreply, State2};
|
%% {noreply, State2};
|
||||||
handle_cast(_Msg, State) ->
|
%% handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
%% {noreply, State}.
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
%% handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
%% {noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
%% terminate(_Reason, _State) ->
|
||||||
ok.
|
%% ok.
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
%% code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
%% {ok, State}.
|
||||||
|
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
|
||||||
find_last_lpn(#state{seq=SequencerPid}) ->
|
%% find_last_lpn(#state{seq=SequencerPid}) ->
|
||||||
find_last_lpn(SequencerPid);
|
%% find_last_lpn(SequencerPid);
|
||||||
find_last_lpn(SequencerPid) ->
|
%% find_last_lpn(SequencerPid) ->
|
||||||
{ok, CurrentLPN} = corfurl_sequencer:get(SequencerPid, 0),
|
%% {ok, CurrentLPN} = corfurl_sequencer:get(SequencerPid, 0),
|
||||||
CurrentLPN - 1.
|
%% CurrentLPN - 1.
|
||||||
|
|
||||||
fetch_unread_pages(#state{seq=___FIXME_SequencerPid, proj=Proj,
|
%% %% AAA refactor: return value changes here, propagate
|
||||||
last_read_lpn=StopAtLPN,
|
%% fetch_unread_pages(#state{proj=Proj, last_fetch_lpn=StopAtLPN,
|
||||||
last_write_lpn=LastLPN} = State) ->
|
%% all_back_ps=BPs} = State) ->
|
||||||
%% TODO: fixme: to handle concurrent updates correctly, we should
|
%% LastLPN = find_last_lpn(State),
|
||||||
%% query the sequencer for the last LPN.
|
%% {LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN),
|
||||||
{BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN),
|
%% NewBPs = append_lpns(LPNs, BPs),
|
||||||
%% TODO ????
|
%% {LPNS, Pages, State#state{last_fetch_lpn=LastLPN, back_ps=BackPs}}.
|
||||||
%% LastReadLPN = if BackPs == [] -> 0;
|
|
||||||
%% true -> hd(BackPs)
|
|
||||||
%% end,
|
|
||||||
{Pages, State#state{last_read_lpn=LastLPN, back_ps=BackPs}}.
|
|
||||||
|
|
||||||
fetch_unread_pages(Proj, LastLPN, StopAtLPN) ->
|
%% fetch_unread_pages(Proj, LastLPN, StopAtLPN)
|
||||||
%% ?D({fetch_unread_pages, LastLPN, StopAtLPN}),
|
%% when LastLPN >= StopAtLPN ->
|
||||||
LPNandPages = tango:scan_backward(Proj, ?OID_STREAM_NUMBER, LastLPN,
|
%% %% ?D({fetch_unread_pages, LastLPN, StopAtLPN}),
|
||||||
StopAtLPN, true),
|
%% LPNandPages = tango:scan_backward(Proj, ?OID_STREAM_NUMBER, LastLPN,
|
||||||
{LPNs, Pages} = lists:unzip(LPNandPages),
|
%% StopAtLPN, true),
|
||||||
BackPs = lists:foldl(fun(P, BPs) -> tango:add_back_pointer(BPs, P) end,
|
%% {_LPNs, _Pages} = lists:unzip(LPNandPages).
|
||||||
[], LPNs),
|
|
||||||
{BackPs, Pages}.
|
|
||||||
|
|
||||||
play_log_pages(Pages, SideEffectsP,
|
%% play_log_pages(Pages, SideEffectsP,
|
||||||
#state{i_state=I_State} = State) ->
|
%% #state{i_state=I_State} = State) ->
|
||||||
I_State2 = play_log_pages(Pages, I_State, ?MODULE, SideEffectsP),
|
%% I_State2 = play_log_pages(Pages, I_State, ?MODULE, SideEffectsP),
|
||||||
State#state{i_state=I_State2}.
|
%% State#state{i_state=I_State2}.
|
||||||
|
|
||||||
play_log_pages(Pages, I_State, CallbackMod, SideEffectsP) ->
|
%% play_log_pages(Pages, I_State, CallbackMod, SideEffectsP) ->
|
||||||
CallbackMod:play_log_mutate_i_state(Pages, SideEffectsP, I_State).
|
%% CallbackMod:play_log_mutate_i_state(Pages, SideEffectsP, I_State).
|
||||||
|
|
||||||
roll_log_forward(State) ->
|
%% roll_log_forward(#state{proj=Proj, all_back_ps=BackPs,
|
||||||
{Pages, State2} = fetch_unread_pages(State),
|
%% last_fetch_lpn=StopAtLPN} = State) ->
|
||||||
play_log_pages(Pages, true, State2).
|
%% LastLPN = find_last_lpn(SequencerPid),
|
||||||
|
%% {LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN),
|
||||||
|
%% NewBPs = append_lpns(LPNs, BackPs),
|
||||||
|
%% play_log_pages(Pages, true, State2#state{all_back_ps=NewBPs}).
|
||||||
|
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%% append_lpns([], BPs) ->
|
||||||
|
%% BPs;
|
||||||
|
%% append_lpns(LPNs, BPs) ->
|
||||||
|
%% lists:reverse(LPNs) ++ BPs.
|
||||||
|
|
||||||
-record(oid_map, {
|
%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
next :: non_neg_integer(),
|
|
||||||
map :: dict()
|
|
||||||
}).
|
|
||||||
|
|
||||||
-define(DICTMOD, dict).
|
%% -record(oid_map, {
|
||||||
|
%% next :: non_neg_integer(),
|
||||||
|
%% map :: dict()
|
||||||
|
%% }).
|
||||||
|
|
||||||
fresh() ->
|
%% -define(DICTMOD, dict).
|
||||||
#oid_map{next=1,
|
|
||||||
map=?DICTMOD:new()}.
|
|
||||||
|
|
||||||
do_pure_op({get, Key}, #oid_map{map=Dict}) ->
|
%% fresh() ->
|
||||||
?DICTMOD:find(Key, Dict).
|
%% #oid_map{next=1,
|
||||||
|
%% map=?DICTMOD:new()}.
|
||||||
|
|
||||||
do_dirty_op({new_oid, _Key, _From}=Op,
|
%% do_pure_op({get, Key}, #oid_map{map=Dict}) ->
|
||||||
I_State, StreamNum, Proj0, PageSize, BackPs) ->
|
%% ?DICTMOD:find(Key, Dict).
|
||||||
Page = term_to_binary(Op),
|
|
||||||
FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize),
|
|
||||||
{{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage,
|
|
||||||
[StreamNum]),
|
|
||||||
NewBackPs = tango:add_back_pointer(BackPs, LPN),
|
|
||||||
{ok, I_State, Proj1, LPN, NewBackPs}.
|
|
||||||
|
|
||||||
play_log_mutate_i_state(Pages, SideEffectsP, I_State) ->
|
%% do_dirty_op({new_oid, _Key, _From, _NumOfAttempts}=Op,
|
||||||
lists:foldl(fun({new_oid, Key, From}=_Op, #oid_map{map=Dict, next=Next}=O) ->
|
%% I_State, StreamNum, Proj0, PageSize) ->
|
||||||
{Res, O2} =
|
%% Page = term_to_binary(Op),
|
||||||
case ?DICTMOD:find(Key, Dict) of
|
%% FullPage = tango:pack_v1([StreamNum], [to_final_page],
|
||||||
error ->
|
%% Page, PageSize),
|
||||||
Dict2 = ?DICTMOD:store(Key, Next, Dict),
|
%% left off here,
|
||||||
{{ok, Next},O#oid_map{map=Dict2,
|
%% {{ok, LPN}, Proj1} = tango:append_page(Proj0, Page, [StreamNum]),
|
||||||
next=Next + 1}};
|
%% {ok, I_State, Proj1, LPN}.
|
||||||
{ok, _} ->
|
|
||||||
{already_exists, O}
|
%% play_log_mutate_i_state(Pages, SideEffectsP, I_State) ->
|
||||||
end,
|
%% lists:foldl(fun({new_oid, Key, From, _NumOfAttempts}=_Op,
|
||||||
if SideEffectsP ->
|
%% #oid_map{map=Dict, next=Next}=O) ->
|
||||||
gen_server:reply(From, Res);
|
%% {Res, O2} =
|
||||||
true ->
|
%% case ?DICTMOD:find(Key, Dict) of
|
||||||
ok
|
%% error ->
|
||||||
end,
|
%% Dict2 = ?DICTMOD:store(Key, Next, Dict),
|
||||||
O2
|
%% {{ok, Next},O#oid_map{map=Dict2,
|
||||||
end,
|
%% next=Next + 1}};
|
||||||
I_State,
|
%% {ok, _} ->
|
||||||
[binary_to_term(Page) || Page <- Pages]).
|
%% {already_exists, O}
|
||||||
|
%% end,
|
||||||
|
%% if SideEffectsP ->
|
||||||
|
%% gen_server:reply(From, Res);
|
||||||
|
%% true ->
|
||||||
|
%% ok
|
||||||
|
%% end,
|
||||||
|
%% O2
|
||||||
|
%% end,
|
||||||
|
%% I_State,
|
||||||
|
%% [binary_to_term(Page) || Page <- Pages]).
|
||||||
|
|
||||||
|
|
|
@ -57,28 +57,35 @@ smoke_test() ->
|
||||||
MLP0 = NumFLUs,
|
MLP0 = NumFLUs,
|
||||||
NumFLUs = ?M:get_max_logical_page(FLUs),
|
NumFLUs = ?M:get_max_logical_page(FLUs),
|
||||||
|
|
||||||
%% Excellent. Now let's start the sequencer and see if it gets
|
|
||||||
%% the same answer. If yes, then the first get will return MLP1,
|
|
||||||
%% yadda yadda.
|
|
||||||
MLP1 = MLP0 + 1,
|
|
||||||
MLP3 = MLP0 + 3,
|
|
||||||
MLP4 = MLP0 + 4,
|
|
||||||
{ok, Sequencer} = ?M:start_link(FLUs),
|
{ok, Sequencer} = ?M:start_link(FLUs),
|
||||||
try
|
try
|
||||||
|
{ok, _} = ?M:get(Sequencer, 5000),
|
||||||
[{Stream9, Tails9}] = StreamTails = [{9, [1125, 1124, 1123]}],
|
[{Stream9, Tails9}] = StreamTails = [{9, [1125, 1124, 1123]}],
|
||||||
ok = ?M:set_tails(Sequencer, StreamTails),
|
ok = ?M:set_tails(Sequencer, StreamTails),
|
||||||
{ok, [Tails9]} = ?M:get_tails(Sequencer, [Stream9]),
|
{ok, _, [Tails9]} = ?M:get_tails(Sequencer, 0, [Stream9]),
|
||||||
|
|
||||||
{ok, LPN1} = ?M:get(Sequencer, 2),
|
{ok, LPN0a} = ?M:get(Sequencer, 2),
|
||||||
{ok, LPN3} = ?M:get(Sequencer, 1, [2]),
|
{ok, LPN0b} = ?M:get(Sequencer, 0),
|
||||||
{ok, LPN4} = ?M:get(Sequencer, 1, [1]),
|
LPN0a = LPN0b - 2,
|
||||||
{ok, LPN5} = ?M:get(Sequencer, 1, [2]),
|
|
||||||
{ok, LPN6} = ?M:get(Sequencer, 1, [2]),
|
|
||||||
{ok, LPN7} = ?M:get(Sequencer, 1, [2]),
|
|
||||||
{ok, LPN8} = ?M:get(Sequencer, 1, [2]),
|
|
||||||
|
|
||||||
{ok, [[LPN4], [LPN8, LPN7, LPN6, LPN5]]} = ?M:get_tails(Sequencer,
|
{ok, LPN2a, _} = ?M:get_tails(Sequencer, 1, [2]),
|
||||||
[1,2])
|
{ok, LPN1a, _} = ?M:get_tails(Sequencer, 1, [1]),
|
||||||
|
{ok, _, [[LPN1a], [LPN2a]]} = ?M:get_tails(Sequencer,
|
||||||
|
0, [1,2]),
|
||||||
|
{ok, LPN2b, _} = ?M:get_tails(Sequencer, 1, [2]),
|
||||||
|
{ok, LPN2c, _} = ?M:get_tails(Sequencer, 1, [2]),
|
||||||
|
{ok, _, [[LPN1a], [LPN2c, LPN2b, LPN2a]]} =
|
||||||
|
?M:get_tails(Sequencer, 0, [1,2]),
|
||||||
|
{ok, LPN2d, _} = ?M:get_tails(Sequencer, 1, [2]),
|
||||||
|
{ok, LPN2e, _} = ?M:get_tails(Sequencer, 1, [2]),
|
||||||
|
|
||||||
|
{ok, LPNX, [[LP1a], [LPN2e, LPN2d, LPN2c, LPN2b]]} =
|
||||||
|
?M:get_tails(Sequencer, 0, [1,2]),
|
||||||
|
{ok, LPNX, [[LP1a], [LPN2e, LPN2d, LPN2c, LPN2b]]} =
|
||||||
|
?M:get_tails(Sequencer, 0, [1,2]), % same results
|
||||||
|
LPNX = LPN2e + 1, % no change with 0 request
|
||||||
|
|
||||||
|
ok
|
||||||
after
|
after
|
||||||
?M:stop(Sequencer)
|
?M:stop(Sequencer)
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue