diff --git a/prototype/tango-prototype/src/tango_oid.erl b/prototype/tango-prototype/src/tango_oid.erl index 726673b..0325cd9 100644 --- a/prototype/tango-prototype/src/tango_oid.erl +++ b/prototype/tango-prototype/src/tango_oid.erl @@ -23,7 +23,7 @@ -behaviour(gen_server). %% API --export([start_link/3, stop/1, +-export([start_link/4, stop/1, put/3, get/2]). %% gen_server callbacks @@ -41,17 +41,19 @@ -type lpn() :: non_neg_integer(). -record(state, { - page_size :: non_neg_integer(), - seq :: pid(), - proj :: term(), - last_read_lpn :: lpn(), + page_size :: non_neg_integer(), % CORFU page size + seq :: pid(), % sequencer pid + proj :: term(), % projection + cb_mod :: atom(), % callback module + last_read_lpn :: lpn(), % last_write_lpn :: lpn(), - back_ps :: [lpn()], - i_state :: orddict:orddict() + back_ps :: [lpn()], % back pointers (up to 4) + i_state :: orddict:orddict() % internal state thingie }). -start_link(PageSize, SequencerPid, Proj) -> - gen_server:start_link(?MODULE, [PageSize, SequencerPid, Proj], []). +start_link(PageSize, SequencerPid, Proj, CallbackMod) -> + gen_server:start_link(?MODULE, + [PageSize, SequencerPid, Proj, CallbackMod], []). stop(Pid) -> gen_server:call(Pid, {stop}, ?LONG_TIME). @@ -64,32 +66,35 @@ get(Pid, Key) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -init([PageSize, SequencerPid, Proj]) -> +init([PageSize, SequencerPid, Proj, CallbackMod]) -> LastLPN = find_last_lpn(SequencerPid), {BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, 0), - Dict = play_log_pages(Pages, orddict:new()), + I_State = play_log_pages(Pages, CallbackMod:fresh(), CallbackMod), {ok, #state{page_size=PageSize, seq=SequencerPid, proj=Proj, + cb_mod=CallbackMod, last_read_lpn=LastLPN, last_write_lpn=LastLPN, back_ps=BackPs, - i_state=Dict}}. + i_state=I_State}}. -handle_call({put, Key, Val}, _From, - #state{proj=Proj0, page_size=PageSize, back_ps=BackPs} = State) -> - Page = term_to_binary({put, Key, Val}), - FullPage = tango:pack_v1([{?OID_STREAM_NUMBER, BackPs}], Page, PageSize), - {{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage), - NewBackPs = tango:add_back_pointer(BackPs, LPN), - {reply, garrr, State#state{proj=Proj1, - last_write_lpn=LPN, - back_ps=NewBackPs}}; -handle_call({get, Key}, _From, State) -> +handle_call({put, _Key, _Val}=Op, _From, + #state{proj=Proj0, cb_mod=CallbackMod, + page_size=PageSize, back_ps=BackPs, i_state=I_State}=State) -> + {Res, I_State2, Proj1, LPN, NewBackPs} = + CallbackMod:do_dirty_op(Op, I_State, ?OID_STREAM_NUMBER, + Proj0, PageSize, BackPs), + {reply, Res, State#state{i_state=I_State2, + proj=Proj1, + last_write_lpn=LPN, + back_ps=NewBackPs}}; +handle_call({get, _Key}=Op, _From, #state{cb_mod=CallbackMod} = State) -> {Pages, State2} = fetch_unread_pages(State), %% ?D(Pages), - #state{i_state=Dict} = State3 = play_log_pages(Pages, State2), - {reply, orddict:find(Key, Dict), State3}; + #state{i_state=I_State} = State3 = play_log_pages(Pages, State2), + Reply = CallbackMod:do_pure_op(Op, I_State), + {reply, Reply, State3}; handle_call({stop}, _From, State) -> {stop, normal, ok, State}; handle_call(_Request, _From, State) -> @@ -137,14 +142,9 @@ fetch_unread_pages(Proj, LastLPN, StopAtLPN) -> [], LPNs), {BackPs, Pages}. -play_log_pages(Pages, #state{i_state=Dict} = State) -> - Dict2 = play_log_pages(Pages, Dict), - State#state{i_state=Dict2}; -play_log_pages(Pages, Dict) -> - %% ?D({play_log_pages, Pages}), - lists:foldl(fun({put, K, V}=_Op, D) -> - %% ?D(_Op), - orddict:store(K, V, D) - end, - Dict, - [binary_to_term(Page) || Page <- Pages]). +play_log_pages(Pages, #state{cb_mod=CallbackMod, i_state=I_State} = State) -> + I_State2 = play_log_pages(Pages, I_State, CallbackMod), + State#state{i_state=I_State2}. + +play_log_pages(Pages, I_State, CallbackMod) -> + CallbackMod:play_log_mutate_i_state(Pages, I_State). diff --git a/prototype/tango-prototype/test/tango_oid_test.erl b/prototype/tango-prototype/test/tango_oid_test.erl index 39d37dc..b19270d 100644 --- a/prototype/tango-prototype/test/tango_oid_test.erl +++ b/prototype/tango-prototype/test/tango_oid_test.erl @@ -36,7 +36,7 @@ tango_oid_smoke_test() -> fun tango_oid_smoke_test_int/3). tango_oid_smoke_test_int(PageSize, Seq, Proj) -> - {ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj), + {ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj, tango_dt_map), ok = tango_oid:stop(OID_Map), ok. @@ -46,15 +46,15 @@ tango_oid_one_test() -> fun tango_oid_one_test_int/3). tango_oid_one_test_int(PageSize, Seq, Proj) -> - {ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj), + {ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj, tango_dt_map), try K1 = foo, V1 = bar1, V2 = bar2, - garrr = tango_oid:put(OID_Map, K1, V1), + ok = tango_oid:put(OID_Map, K1, V1), {ok, V1} = tango_oid:get(OID_Map, K1), - garrr = tango_oid:put(OID_Map, K1, V2), + ok = tango_oid:put(OID_Map, K1, V2), {ok, V2} = tango_oid:get(OID_Map, K1), error = tango_oid:get(OID_Map, "does not exist"),