From 9a3ac0241394dd183c5f7e6d4ee584739390adb9 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Sat, 20 Sep 2014 22:11:31 +0900 Subject: [PATCH] WIP: first round of tango_oid refactoring, all broken horribly --- prototype/tango-prototype/include/corfurl.hrl | 1 + prototype/tango-prototype/src/corfurl.erl | 2 +- prototype/tango-prototype/src/tango.erl | 21 +- prototype/tango-prototype/src/tango_oid.erl | 299 +++++++++--------- 4 files changed, 164 insertions(+), 159 deletions(-) diff --git a/prototype/tango-prototype/include/corfurl.hrl b/prototype/tango-prototype/include/corfurl.hrl index 59ec65a..819e79b 100644 --- a/prototype/tango-prototype/include/corfurl.hrl +++ b/prototype/tango-prototype/include/corfurl.hrl @@ -33,6 +33,7 @@ -record(proj, { % Projection dir :: string(), + page_size :: non_neg_integer(), epoch :: non_neg_integer(), seq :: 'undefined' | seq_name(), r :: [#range{}] diff --git a/prototype/tango-prototype/src/corfurl.erl b/prototype/tango-prototype/src/corfurl.erl index 1212a8a..8852ef2 100644 --- a/prototype/tango-prototype/src/corfurl.erl +++ b/prototype/tango-prototype/src/corfurl.erl @@ -372,5 +372,5 @@ simple_test_setup(RootDir, BaseDirName, PageSize, NumPages, NumFLUs) -> {ok, Seq} = corfurl_sequencer:start_link(FLUs), P0 = corfurl:new_simple_projection(PDir, 1, 1, 1*100, [FLUs]), - P1 = P0#proj{seq={Seq, unused, unused}}, + P1 = P0#proj{seq={Seq, unused, unused}, page_size=PageSize}, {FLUs, Seq, P1, DeleteFLUData}. diff --git a/prototype/tango-prototype/src/tango.erl b/prototype/tango-prototype/src/tango.erl index 4ba0126..519f873 100644 --- a/prototype/tango-prototype/src/tango.erl +++ b/prototype/tango-prototype/src/tango.erl @@ -157,30 +157,35 @@ append_page(Proj, Page, StreamList) -> append_page(Proj, _Page, _StreamList, 0) -> {{error_failed, ?MODULE, ?LINE}, Proj}; -append_page(#proj{seq={Sequencer,_,_}} = Proj, Page, StreamList, Retries) -> +append_page(#proj{seq={Sequencer,_,_}, page_size=PageSize} = Proj, + OrigPage, StreamList, Retries) -> try - {ok, LPN} = corfurl_sequencer:get(Sequencer, 1, StreamList), + {ok, LPN, BackPsList} = corfurl_sequencer:get_tails(Sequencer, 1, + StreamList), %% pulse_tracing_add(write, LPN), - append_page1(Proj, LPN, Page, StreamList, 5) + Page = tango:pack_v1(StreamList, [to_final_page], + OrigPage, PageSize), + append_page1(Proj, LPN, Page, StreamList, 5, OrigPage) catch exit:{Reason,{_gen_server_or_pulse_gen_server,call,[Sequencer|_]}} when Reason == noproc; Reason == normal -> NewSeq = corfurl_client:restart_sequencer(Proj), - append_page(Proj#proj{seq=NewSeq}, Page, StreamList, Retries); + append_page(Proj#proj{seq=NewSeq}, OrigPage, StreamList, Retries); exit:Exit -> {{error_failed, ?MODULE, ?LINE}, incomplete_code, Exit} end. -append_page1(Proj, _LPN, _Page, _StreamList, 0) -> +append_page1(Proj, _LPN, _Page, _StreamList, 0, _OrigPage) -> {{error_failed, ?MODULE, ?LINE}, Proj}; -append_page1(Proj, LPN, Page, StreamList, Retries) -> +append_page1(Proj, LPN, Page, StreamList, Retries, OrigPage) -> case append_page2(Proj, LPN, Page) of lost_race -> - append_page(Proj, Page, StreamList, Retries - 1); + append_page(Proj, OrigPage, StreamList, Retries - 1); error_badepoch -> case corfurl_sequencer:poll_for_new_epoch_projection(Proj) of {ok, NewProj} -> - append_page1(NewProj, LPN, Page, StreamList, Retries - 1); + append_page1(NewProj, LPN, Page, StreamList, Retries - 1, + OrigPage); Else -> {Else, Proj} end; diff --git a/prototype/tango-prototype/src/tango_oid.erl b/prototype/tango-prototype/src/tango_oid.erl index 36d8b01..02ba712 100644 --- a/prototype/tango-prototype/src/tango_oid.erl +++ b/prototype/tango-prototype/src/tango_oid.erl @@ -20,193 +20,192 @@ -module(tango_oid). -%% -behaviour(gen_server). +-behaviour(gen_server). -%% %% API -%% -export([start_link/3, stop/1, -%% new/2, get/2]). +%% API +-export([start_link/3, stop/1, + new/2, get/2]). -%% %% gen_server callbacks -%% -export([init/1, handle_call/3, handle_cast/2, handle_info/2, -%% terminate/2, code_change/3]). +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). -%% %% Tango datatype callbacks (prototype) -%% -export([fresh/0, -%% do_pure_op/2, do_dirty_op/5, play_log_mutate_i_state/3]). +%% Tango datatype callbacks (prototype) +-export([fresh/0, + do_pure_op/2, do_dirty_op/5, play_log_mutate_i_state/3]). -%% -define(SERVER, ?MODULE). -%% -define(OID_STREAM_NUMBER, 0). +-define(SERVER, ?MODULE). +-define(OID_STREAM_NUMBER, 0). -%% -define(LONG_TIME, 30*1000). +-define(LONG_TIME, 30*1000). -%% -define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])). +-define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])). -%% -type lpn() :: non_neg_integer(). +-type lpn() :: non_neg_integer(). -%% -record(state, { -%% page_size :: non_neg_integer(), % Corfurl page size -%% seq :: pid(), % sequencer pid -%% proj :: term(), % projection -%% last_fetch_lpn :: lpn(), % -%% all_back_ps :: [lpn()], % All back-pointers LIFO order! -%% i_state :: term() % internal state thingie -%% }). +-record(state, { + page_size :: non_neg_integer(), % Corfurl page size + seq :: pid(), % sequencer pid + proj :: term(), % projection + last_fetch_lpn :: lpn(), % + all_back_ps :: [lpn()], % All back-pointers LIFO order! + i_state :: term() % internal state thingie + }). -%% start_link(PageSize, SequencerPid, Proj) -> -%% gen_server:start_link(?MODULE, -%% [PageSize, SequencerPid, Proj], []). +start_link(PageSize, SequencerPid, Proj) -> + gen_server:start_link(?MODULE, + [PageSize, SequencerPid, Proj], []). -%% stop(Pid) -> -%% gen_server:call(Pid, {stop}, ?LONG_TIME). +stop(Pid) -> + gen_server:call(Pid, {stop}, ?LONG_TIME). -%% new(Pid, Key) -> -%% gen_server:call(Pid, {new, Key}, ?LONG_TIME). +new(Pid, Key) -> + gen_server:call(Pid, {new, Key}, ?LONG_TIME). -%% get(Pid, Key) -> -%% gen_server:call(Pid, {get, Key}, ?LONG_TIME). +get(Pid, Key) -> + gen_server:call(Pid, {get, Key}, ?LONG_TIME). -%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% init([PageSize, SequencerPid, Proj]) -> -%% LastLPN = find_last_lpn(SequencerPid), -%% {LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, 0), -%% BackPs = lists:reverse(LPNs), -%% LastFetchLPN = case LPNs of [] -> 0; -%% [H|_] -> H -%% end, -%% I_State = play_log_pages(Pages, fresh(), ?MODULE, false), -%% {ok, #state{page_size=PageSize, -%% seq=SequencerPid, -%% proj=Proj, -%% last_fetch_lpn=LastFetchLPN, -%% all_back_ps=BackPs, -%% i_state=I_State}}. +init([PageSize, SequencerPid, Proj]) -> + LastLPN = find_last_lpn(SequencerPid), + {LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, 0), + BackPs = lists:reverse(LPNs), + LastFetchLPN = back_ps2last_lpn(BackPs), + I_State = play_log_pages(Pages, fresh(), ?MODULE, false), + {ok, #state{page_size=PageSize, + seq=SequencerPid, + proj=Proj, + last_fetch_lpn=LastFetchLPN, + all_back_ps=BackPs, + i_state=I_State}}. -%% handle_call({new, Key}, From, -%% #state{proj=Proj0, page_size=PageSize, i_state=I_State}=State) -> -%% Op = {new_oid, Key, From, 0}, -%% {_Res, I_State2, Proj1, LPN} = -%% do_dirty_op(Op, I_State, ?OID_STREAM_NUMBER, Proj0, PageSize), -%% %% Let's see how much trouble we can get outselves in here. -%% %% If we're here, then we've written to the log without error. -%% %% So then the cast to roll forward must see that log entry -%% %% (if it also operates without error). So, the side-effect of -%% %% the op ought to always send a reply to the client. -%% gen_server:cast(self(), {roll_forward}), -%% {noreply, State#state{i_state=I_State2, -%% proj=Proj1}}; -%% handle_call({get, _Key}=Op, _From, State) -> -%% State2 = #state{i_state=I_State} = roll_log_forward(State), -%% Reply = do_pure_op(Op, I_State), -%% {reply, Reply, State2}; -%% handle_call({stop}, _From, State) -> -%% {stop, normal, ok, State}; -%% handle_call(_Request, _From, State) -> -%% Reply = whaaaaaaaaaaaa, -%% {reply, Reply, State}. +handle_call({new, Key}, From, + #state{proj=Proj0, page_size=PageSize, i_state=I_State}=State) -> + Op = {new_oid, Key, From, 0}, + {_Res, I_State2, Proj1, _LPN} = + do_dirty_op(Op, I_State, ?OID_STREAM_NUMBER, Proj0, PageSize), + %% Let's see how much trouble we can get outselves in here. + %% If we're here, then we've written to the log without error. + %% So then the cast to roll forward must see that log entry + %% (if it also operates without error). So, the side-effect of + %% the op ought to always send a reply to the client. + gen_server:cast(self(), {roll_forward}), + {noreply, State#state{i_state=I_State2, + proj=Proj1}}; +handle_call({get, _Key}=Op, _From, State) -> + State2 = #state{i_state=I_State} = roll_log_forward(State), + Reply = do_pure_op(Op, I_State), + {reply, Reply, State2}; +handle_call({stop}, _From, State) -> + {stop, normal, ok, State}; +handle_call(_Request, _From, State) -> + Reply = whaaaaaaaaaaaa, + {reply, Reply, State}. -%% handle_cast({roll_forward}, State) -> -%% State2 = roll_log_forward(State), -%% {noreply, State2}; -%% handle_cast(_Msg, State) -> -%% {noreply, State}. +handle_cast({roll_forward}, State) -> + State2 = roll_log_forward(State), + {noreply, State2}; +handle_cast(_Msg, State) -> + {noreply, State}. -%% handle_info(_Info, State) -> -%% {noreply, State}. +handle_info(_Info, State) -> + {noreply, State}. -%% terminate(_Reason, _State) -> -%% ok. +terminate(_Reason, _State) -> + ok. -%% code_change(_OldVsn, State, _Extra) -> -%% {ok, State}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. -%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% find_last_lpn(#state{seq=SequencerPid}) -> -%% find_last_lpn(SequencerPid); -%% find_last_lpn(SequencerPid) -> -%% {ok, CurrentLPN} = corfurl_sequencer:get(SequencerPid, 0), -%% CurrentLPN - 1. +back_ps2last_lpn([]) -> + 0; +back_ps2last_lpn([H|_]) -> + H. + +find_last_lpn(#state{seq=SequencerPid}) -> + find_last_lpn(SequencerPid); +find_last_lpn(SequencerPid) -> + {ok, _, [BackPs]} = corfurl_sequencer:get_tails(SequencerPid, 0, [?OID_STREAM_NUMBER]), + back_ps2last_lpn(BackPs). -%% %% AAA refactor: return value changes here, propagate %% fetch_unread_pages(#state{proj=Proj, last_fetch_lpn=StopAtLPN, %% all_back_ps=BPs} = State) -> %% LastLPN = find_last_lpn(State), %% {LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN), %% NewBPs = append_lpns(LPNs, BPs), -%% {LPNS, Pages, State#state{last_fetch_lpn=LastLPN, back_ps=BackPs}}. +%% {LPNs, Pages, State#state{last_fetch_lpn=LastLPN, all_back_ps=NewBPs}}. -%% fetch_unread_pages(Proj, LastLPN, StopAtLPN) -%% when LastLPN >= StopAtLPN -> -%% %% ?D({fetch_unread_pages, LastLPN, StopAtLPN}), -%% LPNandPages = tango:scan_backward(Proj, ?OID_STREAM_NUMBER, LastLPN, -%% StopAtLPN, true), -%% {_LPNs, _Pages} = lists:unzip(LPNandPages). +fetch_unread_pages(Proj, LastLPN, StopAtLPN) + when LastLPN >= StopAtLPN -> + %% ?D({fetch_unread_pages, LastLPN, StopAtLPN}), + LPNandPages = tango:scan_backward(Proj, ?OID_STREAM_NUMBER, LastLPN, + StopAtLPN, true), + {_LPNs, _Pages} = lists:unzip(LPNandPages). -%% play_log_pages(Pages, SideEffectsP, -%% #state{i_state=I_State} = State) -> -%% I_State2 = play_log_pages(Pages, I_State, ?MODULE, SideEffectsP), -%% State#state{i_state=I_State2}. +play_log_pages(Pages, SideEffectsP, + #state{i_state=I_State} = State) -> + I_State2 = play_log_pages(Pages, I_State, ?MODULE, SideEffectsP), + State#state{i_state=I_State2}. -%% play_log_pages(Pages, I_State, CallbackMod, SideEffectsP) -> -%% CallbackMod:play_log_mutate_i_state(Pages, SideEffectsP, I_State). +play_log_pages(Pages, I_State, CallbackMod, SideEffectsP) -> + CallbackMod:play_log_mutate_i_state(Pages, SideEffectsP, I_State). -%% roll_log_forward(#state{proj=Proj, all_back_ps=BackPs, -%% last_fetch_lpn=StopAtLPN} = State) -> -%% LastLPN = find_last_lpn(SequencerPid), -%% {LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN), -%% NewBPs = append_lpns(LPNs, BackPs), -%% play_log_pages(Pages, true, State2#state{all_back_ps=NewBPs}). +roll_log_forward(#state{seq=SequencerPid, proj=Proj, all_back_ps=BackPs, + last_fetch_lpn=StopAtLPN} = State) -> + LastLPN = find_last_lpn(SequencerPid), + {LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN), + NewBPs = append_lpns(LPNs, BackPs), + play_log_pages(Pages, true, State#state{all_back_ps=NewBPs}). -%% append_lpns([], BPs) -> -%% BPs; -%% append_lpns(LPNs, BPs) -> -%% lists:reverse(LPNs) ++ BPs. +append_lpns([], BPs) -> + BPs; +append_lpns(LPNs, BPs) -> + lists:reverse(LPNs) ++ BPs. -%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% -record(oid_map, { -%% next :: non_neg_integer(), -%% map :: dict() -%% }). +-record(oid_map, { + next :: non_neg_integer(), + map :: dict() + }). -%% -define(DICTMOD, dict). +-define(DICTMOD, dict). -%% fresh() -> -%% #oid_map{next=1, -%% map=?DICTMOD:new()}. +fresh() -> + #oid_map{next=1, + map=?DICTMOD:new()}. -%% do_pure_op({get, Key}, #oid_map{map=Dict}) -> -%% ?DICTMOD:find(Key, Dict). +do_pure_op({get, Key}, #oid_map{map=Dict}) -> + ?DICTMOD:find(Key, Dict). -%% do_dirty_op({new_oid, _Key, _From, _NumOfAttempts}=Op, -%% I_State, StreamNum, Proj0, PageSize) -> -%% Page = term_to_binary(Op), -%% FullPage = tango:pack_v1([StreamNum], [to_final_page], -%% Page, PageSize), -%% left off here, -%% {{ok, LPN}, Proj1} = tango:append_page(Proj0, Page, [StreamNum]), -%% {ok, I_State, Proj1, LPN}. +do_dirty_op({new_oid, _Key, _From, _NumOfAttempts}=Op, + I_State, StreamNum, Proj0, ___TODO_delme_PageSize) -> + Page = term_to_binary(Op), + {{ok, LPN}, Proj1} = tango:append_page(Proj0, Page, [StreamNum]), + {ok, I_State, Proj1, LPN}. -%% play_log_mutate_i_state(Pages, SideEffectsP, I_State) -> -%% lists:foldl(fun({new_oid, Key, From, _NumOfAttempts}=_Op, -%% #oid_map{map=Dict, next=Next}=O) -> -%% {Res, O2} = -%% case ?DICTMOD:find(Key, Dict) of -%% error -> -%% Dict2 = ?DICTMOD:store(Key, Next, Dict), -%% {{ok, Next},O#oid_map{map=Dict2, -%% next=Next + 1}}; -%% {ok, _} -> -%% {already_exists, O} -%% end, -%% if SideEffectsP -> -%% gen_server:reply(From, Res); -%% true -> -%% ok -%% end, -%% O2 -%% end, -%% I_State, -%% [binary_to_term(Page) || Page <- Pages]). +play_log_mutate_i_state(Pages, SideEffectsP, I_State) -> + lists:foldl(fun({new_oid, Key, From, _NumOfAttempts}=_Op, + #oid_map{map=Dict, next=Next}=O) -> + {Res, O2} = + case ?DICTMOD:find(Key, Dict) of + error -> + Dict2 = ?DICTMOD:store(Key, Next, Dict), + {{ok, Next},O#oid_map{map=Dict2, + next=Next + 1}}; + {ok, _} -> + {already_exists, O} + end, + if SideEffectsP -> + gen_server:reply(From, Res); + true -> + ok + end, + O2 + end, + I_State, + [binary_to_term(Page) || Page <- Pages]).