From 1c1e1368dddb178a7b34e401f62b19965d5be0d4 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Sun, 31 Aug 2014 17:49:45 +0900 Subject: [PATCH] Added src/tango_dt_queue.erl plus test --- .../tango-prototype/src/corfurl_sequencer.erl | 7 +- prototype/tango-prototype/src/tango_dt.erl | 67 ++++------ .../tango-prototype/src/tango_dt_map.erl | 6 +- .../tango-prototype/src/tango_dt_queue.erl | 122 ++++++++++++++++++ .../tango-prototype/src/tango_dt_register.erl | 6 +- prototype/tango-prototype/test/tango_test.erl | 40 ++++++ 6 files changed, 200 insertions(+), 48 deletions(-) create mode 100644 prototype/tango-prototype/src/tango_dt_queue.erl diff --git a/prototype/tango-prototype/src/corfurl_sequencer.erl b/prototype/tango-prototype/src/corfurl_sequencer.erl index 8aedd8c..6e869e1 100644 --- a/prototype/tango-prototype/src/corfurl_sequencer.erl +++ b/prototype/tango-prototype/src/corfurl_sequencer.erl @@ -126,7 +126,12 @@ handle_call({get, NumPages, StreamList, LC}, _From, {Tab, MLP + NumPages, BadPercent, MaxDifference}}; handle_call({get_tails, StreamList, LC}, _From, MLP_tuple) -> Tab = element(1, MLP_tuple), - Tails = [(catch ets:lookup_element(Tab, Stream, 2)) || Stream <- StreamList], + Tails = [case (catch ets:lookup_element(Tab, Stream, 2)) of + {'EXIT', _} -> + 1; + Res -> + Res + end || Stream <- StreamList], NewLC = lclock_update(LC), {reply, {{ok, Tails}, NewLC}, MLP_tuple}; handle_call({set_tails, StreamTails}, _From, MLP_tuple) -> diff --git a/prototype/tango-prototype/src/tango_dt.erl b/prototype/tango-prototype/src/tango_dt.erl index a851e27..c0c4879 100644 --- a/prototype/tango-prototype/src/tango_dt.erl +++ b/prototype/tango-prototype/src/tango_dt.erl @@ -29,10 +29,6 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -%% Tango datatype callbacks (prototype) --export([fresh/0, - do_pure_op/2, do_dirty_op/6, play_log_mutate_i_state/3]). - -define(LONG_TIME, 30*1000). -define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])). @@ -51,10 +47,11 @@ }). -type callback_i_state() :: term(). +-type gen_server_from() :: {pid(), Tag::term()}. -callback fresh() -> callback_i_state(). -callback do_pure_op(term(), callback_i_state()) -> term(). --callback do_dirty_op(term(), callback_i_state(), +-callback do_dirty_op(term(), gen_server_from(), callback_i_state(), StreamNum::non_neg_integer(), Proj0::term(), PageSize::non_neg_integer(), BackPs::list()) -> @@ -86,15 +83,21 @@ init([PageSize, SequencerPid, Proj, CallbackMod, StreamNum]) -> back_ps=BackPs, i_state=I_State}}. -handle_call({cb_dirty_op, Op}, _From, +handle_call({cb_dirty_op, Op}, From, #state{proj=Proj0, cb_mod=CallbackMod, stream_num=StreamNum, page_size=PageSize, back_ps=BackPs, i_state=I_State}=State) -> - {_Res, I_State2, Proj1, _LPN, NewBackPs} = - CallbackMod:do_dirty_op(Op, I_State, StreamNum, + {AsyncType, I_State2, Proj1, LPN, NewBackPs} = + CallbackMod:do_dirty_op(Op, From, I_State, StreamNum, Proj0, PageSize, BackPs), - {reply, ok, State#state{i_state=I_State2, - proj=Proj1, - back_ps=NewBackPs}}; + State2 = State#state{i_state=I_State2, + proj=Proj1, + back_ps=NewBackPs}, + if AsyncType == op_t_async -> + {reply, ok, State2}; + AsyncType == op_t_sync -> + State3 = roll_log_forward(LPN, State2), + {noreply, State3} + end; handle_call({cb_pure_op, Op}, _From, #state{cb_mod=CallbackMod} = State) -> State2 = #state{i_state=I_State} = roll_log_forward(State), Reply = CallbackMod:do_pure_op(Op, I_State), @@ -125,19 +128,21 @@ find_last_lpn(SequencerPid) -> {ok, CurrentLPN} = corfurl_sequencer:get(SequencerPid, 0), CurrentLPN - 1. -fetch_unread_pages(#state{seq=SequencerPid, proj=Proj, stream_num=StreamNum, - last_read_lpn=StopAtLPN} = State) -> +fetch_unread_pages(#state{seq=SequencerPid, stream_num=StreamNum} = State) -> {ok, [LastLPN]} = corfurl_sequencer:get_tails(SequencerPid, [StreamNum]), + fetch_unread_pages2(LastLPN, State). + +fetch_unread_pages(LastLPN, State) -> + fetch_unread_pages2(LastLPN, State). + +fetch_unread_pages2(LastLPN, + #state{proj=Proj, stream_num=StreamNum, + last_read_lpn=StopAtLPN} = State) -> {BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN, StreamNum), - %% TODO ???? - %% LastReadLPN = if BackPs == [] -> 0; - %% true -> hd(BackPs) - %% end, {Pages, State#state{last_read_lpn=LastLPN, back_ps=BackPs}}. fetch_unread_pages(Proj, LastLPN, StopAtLPN, StreamNum) -> - %% ?D({fetch_unread_pages, LastLPN, StopAtLPN}), LPNandPages = tango:scan_backward(Proj, StreamNum, LastLPN, StopAtLPN, true), {LPNs, Pages} = lists:unzip(LPNandPages), @@ -157,27 +162,7 @@ roll_log_forward(State) -> {Pages, State2} = fetch_unread_pages(State), play_log_pages(Pages, true, State2). -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +roll_log_forward(MaybeStartingLPN, State) -> + {Pages, State2} = fetch_unread_pages(MaybeStartingLPN, State), + play_log_pages(Pages, true, State2). -fresh() -> - undefined. - -do_pure_op({o_get}, Register) -> - Register. - -do_dirty_op({o_set, _Val}=Op, - I_State, StreamNum, Proj0, PageSize, BackPs) -> - Page = term_to_binary(Op), - FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize), - {{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage, - [StreamNum]), - NewBackPs = tango:add_back_pointer(BackPs, LPN), - {ok, I_State, Proj1, LPN, NewBackPs}. - -play_log_mutate_i_state(Pages, _SideEffectsP, I_State) -> - lists:foldl(fun({o_set, Val}=_Op, _OldVal) -> - Val - end, - I_State, - [binary_to_term(Page) || Page <- Pages]). - diff --git a/prototype/tango-prototype/src/tango_dt_map.erl b/prototype/tango-prototype/src/tango_dt_map.erl index 1a4276b..7d406ba 100644 --- a/prototype/tango-prototype/src/tango_dt_map.erl +++ b/prototype/tango-prototype/src/tango_dt_map.erl @@ -27,7 +27,7 @@ %% Tango datatype callbacks -export([fresh/0, - do_pure_op/2, do_dirty_op/6, play_log_mutate_i_state/3]). + do_pure_op/2, do_dirty_op/7, play_log_mutate_i_state/3]). -define(DICTMOD, dict). @@ -51,14 +51,14 @@ fresh() -> do_pure_op({o_get, Key}, Dict) -> ?DICTMOD:find(Key, Dict). -do_dirty_op({o_set, _Key, _Val}=Op, +do_dirty_op({o_set, _Key, _Val}=Op, _From, I_State, StreamNum, Proj0, PageSize, BackPs) -> Page = term_to_binary(Op), FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize), {{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage, [StreamNum]), NewBackPs = tango:add_back_pointer(BackPs, LPN), - {ok, I_State, Proj1, LPN, NewBackPs}. + {op_t_async, I_State, Proj1, LPN, NewBackPs}. play_log_mutate_i_state(Pages, _SideEffectsP, I_State) -> lists:foldl(fun({o_set, Key, Val}=_Op, Dict) -> diff --git a/prototype/tango-prototype/src/tango_dt_queue.erl b/prototype/tango-prototype/src/tango_dt_queue.erl new file mode 100644 index 0000000..b9555a9 --- /dev/null +++ b/prototype/tango-prototype/src/tango_dt_queue.erl @@ -0,0 +1,122 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(tango_dt_queue). + +-behaviour(tango_dt). + +-export([start_link/4, + is_empty/1, length/1, peek/1, to_list/1, member/2, + in/2, out/1, reverse/1, filter/2]). + +%% Tango datatype callbacks +-export([fresh/0, + do_pure_op/2, do_dirty_op/7, play_log_mutate_i_state/3]). + +-define(LONG_TIME, 30*1000). + +-define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])). + +start_link(PageSize, SequencerPid, Proj, StreamNum) -> + gen_server:start_link(tango_dt, + [PageSize, SequencerPid, Proj, ?MODULE, StreamNum], + []). + +%% set(Pid, Val) -> +%% gen_server:call(Pid, {cb_dirty_op, {o_set, Val}}, ?LONG_TIME). + +is_empty(Pid) -> + gen_server:call(Pid, {cb_pure_op, {o_is_empty}}, ?LONG_TIME). + +length(Pid) -> + gen_server:call(Pid, {cb_pure_op, {o_length}}, ?LONG_TIME). + +peek(Pid) -> + gen_server:call(Pid, {cb_pure_op, {o_peek}}, ?LONG_TIME). + +to_list(Pid) -> + gen_server:call(Pid, {cb_pure_op, {o_to_list}}, ?LONG_TIME). + +member(Pid, X) -> + gen_server:call(Pid, {cb_pure_op, {o_member, X}}, ?LONG_TIME). + +in(Pid, Val) -> + gen_server:call(Pid, {cb_dirty_op, {o_in, Val}}, ?LONG_TIME). + +out(Pid) -> + gen_server:call(Pid, {cb_dirty_op, {o_out}}, ?LONG_TIME). + +reverse(Pid) -> + gen_server:call(Pid, {cb_dirty_op, {o_reverse}}, ?LONG_TIME). + +filter(Pid, Fun) -> + gen_server:call(Pid, {cb_dirty_op, {o_filter, Fun}}, ?LONG_TIME). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +fresh() -> + queue:new(). + +do_pure_op({o_is_empty}, Q) -> + {ok, queue:is_empty(Q)}; +do_pure_op({o_length}, Q) -> + {ok, queue:len(Q)}; +do_pure_op({o_peek}, Q) -> + {ok, queue:peek(Q)}; +do_pure_op({o_to_list}, Q) -> + {ok, queue:to_list(Q)}; +do_pure_op({o_member, X}, Q) -> + {ok, queue:member(X, Q)}. + +do_dirty_op(Op0, From, + I_State, StreamNum, Proj0, PageSize, BackPs) -> + {AsyncType, Op} = transform_dirty_op(Op0, From), + Page = term_to_binary(Op), + FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize), + {{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage, + [StreamNum]), + NewBackPs = tango:add_back_pointer(BackPs, LPN), + {AsyncType, I_State, Proj1, LPN, NewBackPs}. + +play_log_mutate_i_state(Pages, _SideEffectsP, I_State) -> + lists:foldl(fun({o_in, Val}=_Op, Q) -> + queue:in(Val, Q); + ({o_out, From, Node, WritingPid}, Q) -> + {Reply, NewQ} = queue:out(Q), + if Node == node(), WritingPid == self() -> + gen_server:reply(From, {ok, Reply}); + true -> + ok + end, + NewQ; + ({o_reverse}, Q) -> + queue:reverse(Q); + ({o_filter, Fun}, Q) -> + queue:filter(Fun, Q) + end, + I_State, + [binary_to_term(Page) || Page <- Pages]). + +transform_dirty_op({o_out}, From) -> + %% This func will be executed on the server side prior to writing + %% to the log. + {op_t_sync, {o_out, From, node(), self()}}; +transform_dirty_op(Op, _From) -> + {op_t_async, Op}. diff --git a/prototype/tango-prototype/src/tango_dt_register.erl b/prototype/tango-prototype/src/tango_dt_register.erl index 3c81e05..9225e16 100644 --- a/prototype/tango-prototype/src/tango_dt_register.erl +++ b/prototype/tango-prototype/src/tango_dt_register.erl @@ -27,7 +27,7 @@ %% Tango datatype callbacks -export([fresh/0, - do_pure_op/2, do_dirty_op/6, play_log_mutate_i_state/3]). + do_pure_op/2, do_dirty_op/7, play_log_mutate_i_state/3]). -define(LONG_TIME, 30*1000). @@ -49,14 +49,14 @@ fresh() -> do_pure_op({o_get}, Register) -> {ok, Register}. -do_dirty_op({o_set, _Val}=Op, +do_dirty_op({o_set, _Val}=Op, _From, I_State, StreamNum, Proj0, PageSize, BackPs) -> Page = term_to_binary(Op), FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize), {{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage, [StreamNum]), NewBackPs = tango:add_back_pointer(BackPs, LPN), - {ok, I_State, Proj1, LPN, NewBackPs}. + {op_t_async, I_State, Proj1, LPN, NewBackPs}. play_log_mutate_i_state(Pages, _SideEffectsP, I_State) -> lists:foldl(fun({o_set, Val}=_Op, _OldVal) -> diff --git a/prototype/tango-prototype/test/tango_test.erl b/prototype/tango-prototype/test/tango_test.erl index b5c1e2e..49866b6 100644 --- a/prototype/tango-prototype/test/tango_test.erl +++ b/prototype/tango-prototype/test/tango_test.erl @@ -35,6 +35,8 @@ -define(SEQ, corfurl_sequencer). -define(T, tango). +-define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])). + -ifdef(TEST). -ifndef(PULSE). @@ -228,5 +230,43 @@ tango_dt_map_int(PageSize, Seq, Proj) -> ok. +tango_dt_queue_test() -> + ok = run_test("/tmp", "tango_dt_queue", + 4096, 5*1024, 1, fun tango_dt_queue_int/3). + +tango_dt_queue_int(PageSize, Seq, Proj) -> + {ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj), + + {ok, Q1Num} = tango_oid:new(OID_Map, "queue1"), + {ok, Q1} = tango_dt_queue:start_link(PageSize, Seq, Proj, Q1Num), + + {ok, true} = tango_dt_queue:is_empty(Q1), + {ok, 0} = tango_dt_queue:length(Q1), + Num1 = 5, + Seq1 = lists:seq(1, Num1), + RevSeq1 = lists:reverse(Seq1), + [ok = tango_dt_queue:in(Q1, X) || X <- Seq1], + {ok, Num1} = tango_dt_queue:length(Q1), + {ok, {value, 1}} = tango_dt_queue:peek(Q1), + {ok, Seq1} = tango_dt_queue:to_list(Q1), + ok = tango_dt_queue:reverse(Q1), + {ok, RevSeq1} = tango_dt_queue:to_list(Q1), + ok = tango_dt_queue:reverse(Q1), + + [{ok, {value, X}} = tango_dt_queue:out(Q1) || X <- lists:seq(1, Num1)], + {ok, empty} = tango_dt_queue:out(Q1), + {ok, []} = tango_dt_queue:to_list(Q1), + + [ok = tango_dt_queue:in(Q1, X) || X <- Seq1], + {ok, false} = tango_dt_queue:member(Q1, does_not_exist), + {ok, true} = tango_dt_queue:member(Q1, Num1), + ok = tango_dt_queue:filter(Q1, fun(X) when X == Num1 -> false; + (_) -> true + end), + {ok, false} = tango_dt_queue:member(Q1, Num1), + {ok, true} = tango_dt_queue:member(Q1, Num1 - 1), + + ok. + -endif. % not PULSE -endif. % TEST