Added src/tango_dt_queue.erl plus test

This commit is contained in:
Scott Lystig Fritchie 2014-08-31 17:49:45 +09:00
parent 6caeaeb6b5
commit 1c1e1368dd
6 changed files with 200 additions and 48 deletions

View file

@ -126,7 +126,12 @@ handle_call({get, NumPages, StreamList, LC}, _From,
{Tab, MLP + NumPages, BadPercent, MaxDifference}}; {Tab, MLP + NumPages, BadPercent, MaxDifference}};
handle_call({get_tails, StreamList, LC}, _From, MLP_tuple) -> handle_call({get_tails, StreamList, LC}, _From, MLP_tuple) ->
Tab = element(1, MLP_tuple), Tab = element(1, MLP_tuple),
Tails = [(catch ets:lookup_element(Tab, Stream, 2)) || Stream <- StreamList], Tails = [case (catch ets:lookup_element(Tab, Stream, 2)) of
{'EXIT', _} ->
1;
Res ->
Res
end || Stream <- StreamList],
NewLC = lclock_update(LC), NewLC = lclock_update(LC),
{reply, {{ok, Tails}, NewLC}, MLP_tuple}; {reply, {{ok, Tails}, NewLC}, MLP_tuple};
handle_call({set_tails, StreamTails}, _From, MLP_tuple) -> handle_call({set_tails, StreamTails}, _From, MLP_tuple) ->

View file

@ -29,10 +29,6 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
%% Tango datatype callbacks (prototype)
-export([fresh/0,
do_pure_op/2, do_dirty_op/6, play_log_mutate_i_state/3]).
-define(LONG_TIME, 30*1000). -define(LONG_TIME, 30*1000).
-define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])). -define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])).
@ -51,10 +47,11 @@
}). }).
-type callback_i_state() :: term(). -type callback_i_state() :: term().
-type gen_server_from() :: {pid(), Tag::term()}.
-callback fresh() -> callback_i_state(). -callback fresh() -> callback_i_state().
-callback do_pure_op(term(), callback_i_state()) -> term(). -callback do_pure_op(term(), callback_i_state()) -> term().
-callback do_dirty_op(term(), callback_i_state(), -callback do_dirty_op(term(), gen_server_from(), callback_i_state(),
StreamNum::non_neg_integer(), StreamNum::non_neg_integer(),
Proj0::term(), PageSize::non_neg_integer(), Proj0::term(), PageSize::non_neg_integer(),
BackPs::list()) -> BackPs::list()) ->
@ -86,15 +83,21 @@ init([PageSize, SequencerPid, Proj, CallbackMod, StreamNum]) ->
back_ps=BackPs, back_ps=BackPs,
i_state=I_State}}. i_state=I_State}}.
handle_call({cb_dirty_op, Op}, _From, handle_call({cb_dirty_op, Op}, From,
#state{proj=Proj0, cb_mod=CallbackMod, stream_num=StreamNum, #state{proj=Proj0, cb_mod=CallbackMod, stream_num=StreamNum,
page_size=PageSize, back_ps=BackPs, i_state=I_State}=State) -> page_size=PageSize, back_ps=BackPs, i_state=I_State}=State) ->
{_Res, I_State2, Proj1, _LPN, NewBackPs} = {AsyncType, I_State2, Proj1, LPN, NewBackPs} =
CallbackMod:do_dirty_op(Op, I_State, StreamNum, CallbackMod:do_dirty_op(Op, From, I_State, StreamNum,
Proj0, PageSize, BackPs), Proj0, PageSize, BackPs),
{reply, ok, State#state{i_state=I_State2, State2 = State#state{i_state=I_State2,
proj=Proj1, proj=Proj1,
back_ps=NewBackPs}}; back_ps=NewBackPs},
if AsyncType == op_t_async ->
{reply, ok, State2};
AsyncType == op_t_sync ->
State3 = roll_log_forward(LPN, State2),
{noreply, State3}
end;
handle_call({cb_pure_op, Op}, _From, #state{cb_mod=CallbackMod} = State) -> handle_call({cb_pure_op, Op}, _From, #state{cb_mod=CallbackMod} = State) ->
State2 = #state{i_state=I_State} = roll_log_forward(State), State2 = #state{i_state=I_State} = roll_log_forward(State),
Reply = CallbackMod:do_pure_op(Op, I_State), Reply = CallbackMod:do_pure_op(Op, I_State),
@ -125,19 +128,21 @@ find_last_lpn(SequencerPid) ->
{ok, CurrentLPN} = corfurl_sequencer:get(SequencerPid, 0), {ok, CurrentLPN} = corfurl_sequencer:get(SequencerPid, 0),
CurrentLPN - 1. CurrentLPN - 1.
fetch_unread_pages(#state{seq=SequencerPid, proj=Proj, stream_num=StreamNum, fetch_unread_pages(#state{seq=SequencerPid, stream_num=StreamNum} = State) ->
last_read_lpn=StopAtLPN} = State) ->
{ok, [LastLPN]} = corfurl_sequencer:get_tails(SequencerPid, {ok, [LastLPN]} = corfurl_sequencer:get_tails(SequencerPid,
[StreamNum]), [StreamNum]),
fetch_unread_pages2(LastLPN, State).
fetch_unread_pages(LastLPN, State) ->
fetch_unread_pages2(LastLPN, State).
fetch_unread_pages2(LastLPN,
#state{proj=Proj, stream_num=StreamNum,
last_read_lpn=StopAtLPN} = State) ->
{BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN, StreamNum), {BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN, StreamNum),
%% TODO ????
%% LastReadLPN = if BackPs == [] -> 0;
%% true -> hd(BackPs)
%% end,
{Pages, State#state{last_read_lpn=LastLPN, back_ps=BackPs}}. {Pages, State#state{last_read_lpn=LastLPN, back_ps=BackPs}}.
fetch_unread_pages(Proj, LastLPN, StopAtLPN, StreamNum) -> fetch_unread_pages(Proj, LastLPN, StopAtLPN, StreamNum) ->
%% ?D({fetch_unread_pages, LastLPN, StopAtLPN}),
LPNandPages = tango:scan_backward(Proj, StreamNum, LastLPN, LPNandPages = tango:scan_backward(Proj, StreamNum, LastLPN,
StopAtLPN, true), StopAtLPN, true),
{LPNs, Pages} = lists:unzip(LPNandPages), {LPNs, Pages} = lists:unzip(LPNandPages),
@ -157,27 +162,7 @@ roll_log_forward(State) ->
{Pages, State2} = fetch_unread_pages(State), {Pages, State2} = fetch_unread_pages(State),
play_log_pages(Pages, true, State2). play_log_pages(Pages, true, State2).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% roll_log_forward(MaybeStartingLPN, State) ->
{Pages, State2} = fetch_unread_pages(MaybeStartingLPN, State),
fresh() -> play_log_pages(Pages, true, State2).
undefined.
do_pure_op({o_get}, Register) ->
Register.
do_dirty_op({o_set, _Val}=Op,
I_State, StreamNum, Proj0, PageSize, BackPs) ->
Page = term_to_binary(Op),
FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize),
{{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage,
[StreamNum]),
NewBackPs = tango:add_back_pointer(BackPs, LPN),
{ok, I_State, Proj1, LPN, NewBackPs}.
play_log_mutate_i_state(Pages, _SideEffectsP, I_State) ->
lists:foldl(fun({o_set, Val}=_Op, _OldVal) ->
Val
end,
I_State,
[binary_to_term(Page) || Page <- Pages]).

View file

@ -27,7 +27,7 @@
%% Tango datatype callbacks %% Tango datatype callbacks
-export([fresh/0, -export([fresh/0,
do_pure_op/2, do_dirty_op/6, play_log_mutate_i_state/3]). do_pure_op/2, do_dirty_op/7, play_log_mutate_i_state/3]).
-define(DICTMOD, dict). -define(DICTMOD, dict).
@ -51,14 +51,14 @@ fresh() ->
do_pure_op({o_get, Key}, Dict) -> do_pure_op({o_get, Key}, Dict) ->
?DICTMOD:find(Key, Dict). ?DICTMOD:find(Key, Dict).
do_dirty_op({o_set, _Key, _Val}=Op, do_dirty_op({o_set, _Key, _Val}=Op, _From,
I_State, StreamNum, Proj0, PageSize, BackPs) -> I_State, StreamNum, Proj0, PageSize, BackPs) ->
Page = term_to_binary(Op), Page = term_to_binary(Op),
FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize), FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize),
{{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage, {{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage,
[StreamNum]), [StreamNum]),
NewBackPs = tango:add_back_pointer(BackPs, LPN), NewBackPs = tango:add_back_pointer(BackPs, LPN),
{ok, I_State, Proj1, LPN, NewBackPs}. {op_t_async, I_State, Proj1, LPN, NewBackPs}.
play_log_mutate_i_state(Pages, _SideEffectsP, I_State) -> play_log_mutate_i_state(Pages, _SideEffectsP, I_State) ->
lists:foldl(fun({o_set, Key, Val}=_Op, Dict) -> lists:foldl(fun({o_set, Key, Val}=_Op, Dict) ->

View file

@ -0,0 +1,122 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(tango_dt_queue).
-behaviour(tango_dt).
-export([start_link/4,
is_empty/1, length/1, peek/1, to_list/1, member/2,
in/2, out/1, reverse/1, filter/2]).
%% Tango datatype callbacks
-export([fresh/0,
do_pure_op/2, do_dirty_op/7, play_log_mutate_i_state/3]).
-define(LONG_TIME, 30*1000).
-define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])).
start_link(PageSize, SequencerPid, Proj, StreamNum) ->
gen_server:start_link(tango_dt,
[PageSize, SequencerPid, Proj, ?MODULE, StreamNum],
[]).
%% set(Pid, Val) ->
%% gen_server:call(Pid, {cb_dirty_op, {o_set, Val}}, ?LONG_TIME).
is_empty(Pid) ->
gen_server:call(Pid, {cb_pure_op, {o_is_empty}}, ?LONG_TIME).
length(Pid) ->
gen_server:call(Pid, {cb_pure_op, {o_length}}, ?LONG_TIME).
peek(Pid) ->
gen_server:call(Pid, {cb_pure_op, {o_peek}}, ?LONG_TIME).
to_list(Pid) ->
gen_server:call(Pid, {cb_pure_op, {o_to_list}}, ?LONG_TIME).
member(Pid, X) ->
gen_server:call(Pid, {cb_pure_op, {o_member, X}}, ?LONG_TIME).
in(Pid, Val) ->
gen_server:call(Pid, {cb_dirty_op, {o_in, Val}}, ?LONG_TIME).
out(Pid) ->
gen_server:call(Pid, {cb_dirty_op, {o_out}}, ?LONG_TIME).
reverse(Pid) ->
gen_server:call(Pid, {cb_dirty_op, {o_reverse}}, ?LONG_TIME).
filter(Pid, Fun) ->
gen_server:call(Pid, {cb_dirty_op, {o_filter, Fun}}, ?LONG_TIME).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
fresh() ->
queue:new().
do_pure_op({o_is_empty}, Q) ->
{ok, queue:is_empty(Q)};
do_pure_op({o_length}, Q) ->
{ok, queue:len(Q)};
do_pure_op({o_peek}, Q) ->
{ok, queue:peek(Q)};
do_pure_op({o_to_list}, Q) ->
{ok, queue:to_list(Q)};
do_pure_op({o_member, X}, Q) ->
{ok, queue:member(X, Q)}.
do_dirty_op(Op0, From,
I_State, StreamNum, Proj0, PageSize, BackPs) ->
{AsyncType, Op} = transform_dirty_op(Op0, From),
Page = term_to_binary(Op),
FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize),
{{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage,
[StreamNum]),
NewBackPs = tango:add_back_pointer(BackPs, LPN),
{AsyncType, I_State, Proj1, LPN, NewBackPs}.
play_log_mutate_i_state(Pages, _SideEffectsP, I_State) ->
lists:foldl(fun({o_in, Val}=_Op, Q) ->
queue:in(Val, Q);
({o_out, From, Node, WritingPid}, Q) ->
{Reply, NewQ} = queue:out(Q),
if Node == node(), WritingPid == self() ->
gen_server:reply(From, {ok, Reply});
true ->
ok
end,
NewQ;
({o_reverse}, Q) ->
queue:reverse(Q);
({o_filter, Fun}, Q) ->
queue:filter(Fun, Q)
end,
I_State,
[binary_to_term(Page) || Page <- Pages]).
transform_dirty_op({o_out}, From) ->
%% This func will be executed on the server side prior to writing
%% to the log.
{op_t_sync, {o_out, From, node(), self()}};
transform_dirty_op(Op, _From) ->
{op_t_async, Op}.

View file

@ -27,7 +27,7 @@
%% Tango datatype callbacks %% Tango datatype callbacks
-export([fresh/0, -export([fresh/0,
do_pure_op/2, do_dirty_op/6, play_log_mutate_i_state/3]). do_pure_op/2, do_dirty_op/7, play_log_mutate_i_state/3]).
-define(LONG_TIME, 30*1000). -define(LONG_TIME, 30*1000).
@ -49,14 +49,14 @@ fresh() ->
do_pure_op({o_get}, Register) -> do_pure_op({o_get}, Register) ->
{ok, Register}. {ok, Register}.
do_dirty_op({o_set, _Val}=Op, do_dirty_op({o_set, _Val}=Op, _From,
I_State, StreamNum, Proj0, PageSize, BackPs) -> I_State, StreamNum, Proj0, PageSize, BackPs) ->
Page = term_to_binary(Op), Page = term_to_binary(Op),
FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize), FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize),
{{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage, {{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage,
[StreamNum]), [StreamNum]),
NewBackPs = tango:add_back_pointer(BackPs, LPN), NewBackPs = tango:add_back_pointer(BackPs, LPN),
{ok, I_State, Proj1, LPN, NewBackPs}. {op_t_async, I_State, Proj1, LPN, NewBackPs}.
play_log_mutate_i_state(Pages, _SideEffectsP, I_State) -> play_log_mutate_i_state(Pages, _SideEffectsP, I_State) ->
lists:foldl(fun({o_set, Val}=_Op, _OldVal) -> lists:foldl(fun({o_set, Val}=_Op, _OldVal) ->

View file

@ -35,6 +35,8 @@
-define(SEQ, corfurl_sequencer). -define(SEQ, corfurl_sequencer).
-define(T, tango). -define(T, tango).
-define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])).
-ifdef(TEST). -ifdef(TEST).
-ifndef(PULSE). -ifndef(PULSE).
@ -228,5 +230,43 @@ tango_dt_map_int(PageSize, Seq, Proj) ->
ok. ok.
tango_dt_queue_test() ->
ok = run_test("/tmp", "tango_dt_queue",
4096, 5*1024, 1, fun tango_dt_queue_int/3).
tango_dt_queue_int(PageSize, Seq, Proj) ->
{ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj),
{ok, Q1Num} = tango_oid:new(OID_Map, "queue1"),
{ok, Q1} = tango_dt_queue:start_link(PageSize, Seq, Proj, Q1Num),
{ok, true} = tango_dt_queue:is_empty(Q1),
{ok, 0} = tango_dt_queue:length(Q1),
Num1 = 5,
Seq1 = lists:seq(1, Num1),
RevSeq1 = lists:reverse(Seq1),
[ok = tango_dt_queue:in(Q1, X) || X <- Seq1],
{ok, Num1} = tango_dt_queue:length(Q1),
{ok, {value, 1}} = tango_dt_queue:peek(Q1),
{ok, Seq1} = tango_dt_queue:to_list(Q1),
ok = tango_dt_queue:reverse(Q1),
{ok, RevSeq1} = tango_dt_queue:to_list(Q1),
ok = tango_dt_queue:reverse(Q1),
[{ok, {value, X}} = tango_dt_queue:out(Q1) || X <- lists:seq(1, Num1)],
{ok, empty} = tango_dt_queue:out(Q1),
{ok, []} = tango_dt_queue:to_list(Q1),
[ok = tango_dt_queue:in(Q1, X) || X <- Seq1],
{ok, false} = tango_dt_queue:member(Q1, does_not_exist),
{ok, true} = tango_dt_queue:member(Q1, Num1),
ok = tango_dt_queue:filter(Q1, fun(X) when X == Num1 -> false;
(_) -> true
end),
{ok, false} = tango_dt_queue:member(Q1, Num1),
{ok, true} = tango_dt_queue:member(Q1, Num1 - 1),
ok.
-endif. % not PULSE -endif. % not PULSE
-endif. % TEST -endif. % TEST