diff --git a/prototype/tango-prototype/src/tango_oid.erl b/prototype/tango-prototype/src/tango_oid.erl index 1264e58..a94f06f 100644 --- a/prototype/tango-prototype/src/tango_oid.erl +++ b/prototype/tango-prototype/src/tango_oid.erl @@ -24,17 +24,20 @@ %% API -export([start_link/4, stop/1, - put/3, get/2]). + new/2, get/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% Tango datatype callbacks (prototype) +-export([fresh/0, + do_pure_op/2, do_dirty_op/6, play_log_mutate_i_state/3]). + -define(SERVER, ?MODULE). -define(OID_STREAM_NUMBER, 0). --define(LONG_TIME, 5*1000). -%% -define(LONG_TIME, 30*1000). +-define(LONG_TIME, 30*1000). -define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])). @@ -58,8 +61,8 @@ start_link(PageSize, SequencerPid, Proj, CallbackMod) -> stop(Pid) -> gen_server:call(Pid, {stop}, ?LONG_TIME). -put(Pid, Key, Val) -> - gen_server:call(Pid, {put, Key, Val}, ?LONG_TIME). +new(Pid, Key) -> + gen_server:call(Pid, {new, Key}, ?LONG_TIME). get(Pid, Key) -> gen_server:call(Pid, {get, Key}, ?LONG_TIME). @@ -69,7 +72,7 @@ get(Pid, Key) -> init([PageSize, SequencerPid, Proj, CallbackMod]) -> LastLPN = find_last_lpn(SequencerPid), {BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, 0), - I_State = play_log_pages(Pages, CallbackMod:fresh(), CallbackMod), + I_State = play_log_pages(Pages, CallbackMod:fresh(), CallbackMod, false), {ok, #state{page_size=PageSize, seq=SequencerPid, proj=Proj, @@ -79,28 +82,36 @@ init([PageSize, SequencerPid, Proj, CallbackMod]) -> back_ps=BackPs, i_state=I_State}}. -handle_call({put, _Key, _Val}=Op, _From, +handle_call({new, Key}, From, #state{proj=Proj0, cb_mod=CallbackMod, page_size=PageSize, back_ps=BackPs, i_state=I_State}=State) -> - {Res, I_State2, Proj1, LPN, NewBackPs} = + Op = {new_oid, Key, From}, + {_Res, I_State2, Proj1, LPN, NewBackPs} = CallbackMod:do_dirty_op(Op, I_State, ?OID_STREAM_NUMBER, Proj0, PageSize, BackPs), - {reply, Res, State#state{i_state=I_State2, - proj=Proj1, - last_write_lpn=LPN, - back_ps=NewBackPs}}; + %% Let's see how much trouble we can get outselves in here. + %% If we're here, then we've written to the log without error. + %% So then the cast to roll forward must see that log entry + %% (if it also operates without error). So, the side-effect of + %% the op ought to always send a reply to the client. + gen_server:cast(self(), {roll_forward}), + {noreply, State#state{i_state=I_State2, + proj=Proj1, + last_write_lpn=LPN, + back_ps=NewBackPs}}; handle_call({get, _Key}=Op, _From, #state{cb_mod=CallbackMod} = State) -> - {Pages, State2} = fetch_unread_pages(State), - %% ?D(Pages), - #state{i_state=I_State} = State3 = play_log_pages(Pages, State2), + State2 = #state{i_state=I_State} = roll_log_forward(State), Reply = CallbackMod:do_pure_op(Op, I_State), - {reply, Reply, State3}; + {reply, Reply, State2}; handle_call({stop}, _From, State) -> {stop, normal, ok, State}; handle_call(_Request, _From, State) -> Reply = whaaaaaaaaaaaa, {reply, Reply, State}. +handle_cast({roll_forward}, State) -> + State2 = roll_log_forward(State), + {noreply, State2}; handle_cast(_Msg, State) -> {noreply, State}. @@ -142,9 +153,60 @@ fetch_unread_pages(Proj, LastLPN, StopAtLPN) -> [], LPNs), {BackPs, Pages}. -play_log_pages(Pages, #state{cb_mod=CallbackMod, i_state=I_State} = State) -> - I_State2 = play_log_pages(Pages, I_State, CallbackMod), +play_log_pages(Pages, SideEffectsP, + #state{cb_mod=CallbackMod, i_state=I_State} = State) -> + I_State2 = play_log_pages(Pages, I_State, CallbackMod, SideEffectsP), State#state{i_state=I_State2}. -play_log_pages(Pages, I_State, CallbackMod) -> - CallbackMod:play_log_mutate_i_state(Pages, I_State). +play_log_pages(Pages, I_State, CallbackMod, SideEffectsP) -> + CallbackMod:play_log_mutate_i_state(Pages, SideEffectsP, I_State). + +roll_log_forward(State) -> + {Pages, State2} = fetch_unread_pages(State), + play_log_pages(Pages, true, State2). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-record(oid_map, { + next :: non_neg_integer(), + map :: dict() + }). + +-define(DICTMOD, dict). + +fresh() -> + #oid_map{next=1, + map=?DICTMOD:new()}. + +do_pure_op({get, Key}, #oid_map{map=Dict}) -> + ?DICTMOD:find(Key, Dict). + +do_dirty_op({new_oid, _Key, _From}=Op, + I_State, StreamNum, Proj0, PageSize, BackPs) -> + Page = term_to_binary(Op), + FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize), + {{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage), + NewBackPs = tango:add_back_pointer(BackPs, LPN), + {ok, I_State, Proj1, LPN, NewBackPs}. + +play_log_mutate_i_state(Pages, SideEffectsP, I_State) -> + lists:foldl(fun({new_oid, Key, From}=_Op, #oid_map{map=Dict, next=Next}=O) -> + {Res, O2} = + case ?DICTMOD:find(Key, Dict) of + error -> + Dict2 = ?DICTMOD:store(Key, Next, Dict), + {{ok, Next},O#oid_map{map=Dict2, + next=Next + 1}}; + {ok, _} -> + {already_exists, O} + end, + if SideEffectsP -> + gen_server:reply(From, Res); + true -> + ok + end, + O2 + end, + I_State, + [binary_to_term(Page) || Page <- Pages]). + diff --git a/prototype/tango-prototype/test/tango_oid_test.erl b/prototype/tango-prototype/test/tango_oid_test.erl index b19270d..3b1e878 100644 --- a/prototype/tango-prototype/test/tango_oid_test.erl +++ b/prototype/tango-prototype/test/tango_oid_test.erl @@ -36,7 +36,7 @@ tango_oid_smoke_test() -> fun tango_oid_smoke_test_int/3). tango_oid_smoke_test_int(PageSize, Seq, Proj) -> - {ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj, tango_dt_map), + {ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj, tango_oid), ok = tango_oid:stop(OID_Map), ok. @@ -46,18 +46,22 @@ tango_oid_one_test() -> fun tango_oid_one_test_int/3). tango_oid_one_test_int(PageSize, Seq, Proj) -> - {ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj, tango_dt_map), + {ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj, tango_oid), try K1 = foo, - V1 = bar1, - V2 = bar2, - ok = tango_oid:put(OID_Map, K1, V1), - {ok, V1} = tango_oid:get(OID_Map, K1), - ok = tango_oid:put(OID_Map, K1, V2), - {ok, V2} = tango_oid:get(OID_Map, K1), + K2 = bar, + OID_Num1 = 1, + {ok, OID_Num1} = tango_oid:new(OID_Map, K1), + {ok, OID_Num1} = tango_oid:get(OID_Map, K1), + already_exists = tango_oid:new(OID_Map, K1), + %% The V2 put should *not* have clobbered the previous value + {ok, OID_Num1} = tango_oid:get(OID_Map, K1), error = tango_oid:get(OID_Map, "does not exist"), + {ok, OID_Num2} = tango_oid:new(OID_Map, K2), + {ok, OID_Num2} = tango_oid:get(OID_Map, K2), + ok after tango_oid:stop(OID_Map)