From be55d98bb57d31d8b6d93572d0cd31b2bec5f5bf Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Sat, 30 Aug 2014 20:10:28 +0900 Subject: [PATCH] Tango OID mapper put now passes basic unit test! --- prototype/tango-prototype/src/corfurl.erl | 6 +- prototype/tango-prototype/src/tango.erl | 8 ++ prototype/tango-prototype/src/tango_oid.erl | 96 ++++++++++++++++--- .../tango-prototype/test/tango_oid_test.erl | 28 +++++- 4 files changed, 121 insertions(+), 17 deletions(-) diff --git a/prototype/tango-prototype/src/corfurl.erl b/prototype/tango-prototype/src/corfurl.erl index 0a792e7..1212a8a 100644 --- a/prototype/tango-prototype/src/corfurl.erl +++ b/prototype/tango-prototype/src/corfurl.erl @@ -350,7 +350,11 @@ project_to_chain(LPN, P) -> [#range{pn_start=Start, pn_end=End, chains=Chains}] = P#proj.r, if Start =< LPN, LPN =< End -> I = ((LPN - Start) rem tuple_size(Chains)) + 1, - element(I, Chains) + element(I, Chains); + true -> + exit({?MODULE, project_to_chain, [{start, Start}, + {lpn, LPN}, + {'end', End}]}) end. simple_test_setup(RootDir, BaseDirName, PageSize, NumPages, NumFLUs) -> diff --git a/prototype/tango-prototype/src/tango.erl b/prototype/tango-prototype/src/tango.erl index 54e3506..e5c272d 100644 --- a/prototype/tango-prototype/src/tango.erl +++ b/prototype/tango-prototype/src/tango.erl @@ -23,6 +23,7 @@ -module(tango). -export([pack_v1/3, unpack_v1/2, + add_back_pointer/2, add_back_pointer/3, scan_backward/4, scan_backward/5, @@ -30,6 +31,8 @@ -define(MAGIC_NUMBER_V1, 16#88990011). +-define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])). + %% TODO: for version 2: add strong checksum pack_v1(StreamList, Page, PageSize) when is_list(StreamList), is_binary(Page) -> @@ -86,8 +89,11 @@ scan_backward2(_Proj, _Stream, LastLPN, StopAtLPN, _NumPages, _WithPagesP) when LastLPN =< StopAtLPN; LastLPN =< 0 -> []; scan_backward2(Proj, Stream, LastLPN, StopAtLPN, NumPages, WithPagesP) -> + %% ?D({scan, lastlpn, LastLPN}), case corfurl:read_page(Proj, LastLPN) of {ok, FullPage} -> + %% ?D({scan, LastLPN, ok}), + %% ?D({scan, Stream, unpack_v1(FullPage, stream_list)}), case proplists:get_value(Stream, unpack_v1(FullPage, stream_list)) of undefined -> if NumPages == 0 -> @@ -110,6 +116,7 @@ scan_backward2(Proj, Stream, LastLPN, StopAtLPN, NumPages, WithPagesP) -> end; BackPs -> if WithPagesP -> + %% ?D({bummer, BackPs}), [{LastLPN, unpack_v1(FullPage, page)}| scan_backward2(Proj, Stream, hd(BackPs), StopAtLPN, NumPages + 1, @@ -126,6 +133,7 @@ scan_backward2(Proj, Stream, LastLPN, StopAtLPN, NumPages, WithPagesP) -> end end; Err -> + %% ?D({scan, LastLPN, Err}), Err end. diff --git a/prototype/tango-prototype/src/tango_oid.erl b/prototype/tango-prototype/src/tango_oid.erl index 5679570..726673b 100644 --- a/prototype/tango-prototype/src/tango_oid.erl +++ b/prototype/tango-prototype/src/tango_oid.erl @@ -23,7 +23,8 @@ -behaviour(gen_server). %% API --export([start_link/2, stop/1]). +-export([start_link/3, stop/1, + put/3, get/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -35,29 +36,60 @@ -define(LONG_TIME, 5*1000). %% -define(LONG_TIME, 30*1000). +-define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])). + +-type lpn() :: non_neg_integer(). + -record(state, { + page_size :: non_neg_integer(), seq :: pid(), - proj :: term() + proj :: term(), + last_read_lpn :: lpn(), + last_write_lpn :: lpn(), + back_ps :: [lpn()], + i_state :: orddict:orddict() }). -start_link(SequencerPid, Proj) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [SequencerPid, Proj], []). +start_link(PageSize, SequencerPid, Proj) -> + gen_server:start_link(?MODULE, [PageSize, SequencerPid, Proj], []). stop(Pid) -> gen_server:call(Pid, {stop}, ?LONG_TIME). +put(Pid, Key, Val) -> + gen_server:call(Pid, {put, Key, Val}, ?LONG_TIME). + +get(Pid, Key) -> + gen_server:call(Pid, {get, Key}, ?LONG_TIME). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -init([SequencerPid, Proj]) -> - {ok, CurrentLPN} = corfurl_sequencer:get(SequencerPid, 0), - LastLPN = CurrentLPN - 1, -io:format(user, "LastLPN = ~p\n", [LastLPN]), - LPNandPages = tango:scan_backward(Proj, ?OID_STREAM_NUMBER, LastLPN, false), - {_LPNS, Pages} = lists:unzip(LPNandPages), - io:format("Pages = ~p\n", [Pages]), - {ok, #state{seq=SequencerPid, - proj=Proj}}. +init([PageSize, SequencerPid, Proj]) -> + LastLPN = find_last_lpn(SequencerPid), + {BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, 0), + Dict = play_log_pages(Pages, orddict:new()), + {ok, #state{page_size=PageSize, + seq=SequencerPid, + proj=Proj, + last_read_lpn=LastLPN, + last_write_lpn=LastLPN, + back_ps=BackPs, + i_state=Dict}}. +handle_call({put, Key, Val}, _From, + #state{proj=Proj0, page_size=PageSize, back_ps=BackPs} = State) -> + Page = term_to_binary({put, Key, Val}), + FullPage = tango:pack_v1([{?OID_STREAM_NUMBER, BackPs}], Page, PageSize), + {{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage), + NewBackPs = tango:add_back_pointer(BackPs, LPN), + {reply, garrr, State#state{proj=Proj1, + last_write_lpn=LPN, + back_ps=NewBackPs}}; +handle_call({get, Key}, _From, State) -> + {Pages, State2} = fetch_unread_pages(State), + %% ?D(Pages), + #state{i_state=Dict} = State3 = play_log_pages(Pages, State2), + {reply, orddict:find(Key, Dict), State3}; handle_call({stop}, _From, State) -> {stop, normal, ok, State}; handle_call(_Request, _From, State) -> @@ -78,3 +110,41 @@ code_change(_OldVsn, State, _Extra) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +find_last_lpn(#state{seq=SequencerPid}) -> + find_last_lpn(SequencerPid); +find_last_lpn(SequencerPid) -> + {ok, CurrentLPN} = corfurl_sequencer:get(SequencerPid, 0), + CurrentLPN - 1. + +fetch_unread_pages(#state{seq=___FIXME_SequencerPid, proj=Proj, + last_read_lpn=StopAtLPN, + last_write_lpn=LastLPN} = State) -> + %% TODO: fixme: to handle concurrent updates correctly, we should + %% query the sequencer for the last LPN. + {BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN), + %% TODO ???? + %% LastReadLPN = if BackPs == [] -> 0; + %% true -> hd(BackPs) + %% end, + {Pages, State#state{last_read_lpn=LastLPN, back_ps=BackPs}}. + +fetch_unread_pages(Proj, LastLPN, StopAtLPN) -> + %% ?D({fetch_unread_pages, LastLPN, StopAtLPN}), + LPNandPages = tango:scan_backward(Proj, ?OID_STREAM_NUMBER, LastLPN, + StopAtLPN, true), + {LPNs, Pages} = lists:unzip(LPNandPages), + BackPs = lists:foldl(fun(P, BPs) -> tango:add_back_pointer(BPs, P) end, + [], LPNs), + {BackPs, Pages}. + +play_log_pages(Pages, #state{i_state=Dict} = State) -> + Dict2 = play_log_pages(Pages, Dict), + State#state{i_state=Dict2}; +play_log_pages(Pages, Dict) -> + %% ?D({play_log_pages, Pages}), + lists:foldl(fun({put, K, V}=_Op, D) -> + %% ?D(_Op), + orddict:store(K, V, D) + end, + Dict, + [binary_to_term(Page) || Page <- Pages]). diff --git a/prototype/tango-prototype/test/tango_oid_test.erl b/prototype/tango-prototype/test/tango_oid_test.erl index f0f3e9a..3c31fff 100644 --- a/prototype/tango-prototype/test/tango_oid_test.erl +++ b/prototype/tango-prototype/test/tango_oid_test.erl @@ -32,8 +32,30 @@ tango_oid_smoke_test() -> ok = tango_test:run_test("/tmp", "tango_oid_smoke", 4096, 5*1024, 1, fun tango_oid_smoke_test_int/3). -tango_oid_smoke_test_int(_PageSize, Seq, Proj) -> - {ok, OID_Map} = tango_oid:start_link(Seq, Proj), - ok = tango_oid:stop(OID_Map), +tango_oid_smoke_test_int(PageSize, Seq, Proj) -> + {ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj), + ok = tango_oid:stop(OID_Map), ok. + +tango_oid_one_test() -> + ok = tango_test:run_test("/tmp", "tango_oid_one", 4096, 5*1024, 1, + fun tango_oid_one_test_int/3). + +tango_oid_one_test_int(PageSize, Seq, Proj) -> + {ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj), + + try + K1 = foo, + V1 = bar1, + V2 = bar2, + garrr = tango_oid:put(OID_Map, K1, V1), + {ok, V1} = tango_oid:get(OID_Map, K1), + garrr = tango_oid:put(OID_Map, K1, V2), + {ok, V2} = tango_oid:get(OID_Map, K1), + error = tango_oid:get(OID_Map, "does not exist"), + + ok + after + tango_oid:stop(OID_Map) + end.