Tango OID mapper put now passes basic unit test!
This commit is contained in:
parent
52e2fa2edb
commit
be55d98bb5
4 changed files with 121 additions and 17 deletions
|
@ -350,7 +350,11 @@ project_to_chain(LPN, P) ->
|
||||||
[#range{pn_start=Start, pn_end=End, chains=Chains}] = P#proj.r,
|
[#range{pn_start=Start, pn_end=End, chains=Chains}] = P#proj.r,
|
||||||
if Start =< LPN, LPN =< End ->
|
if Start =< LPN, LPN =< End ->
|
||||||
I = ((LPN - Start) rem tuple_size(Chains)) + 1,
|
I = ((LPN - Start) rem tuple_size(Chains)) + 1,
|
||||||
element(I, Chains)
|
element(I, Chains);
|
||||||
|
true ->
|
||||||
|
exit({?MODULE, project_to_chain, [{start, Start},
|
||||||
|
{lpn, LPN},
|
||||||
|
{'end', End}]})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
simple_test_setup(RootDir, BaseDirName, PageSize, NumPages, NumFLUs) ->
|
simple_test_setup(RootDir, BaseDirName, PageSize, NumPages, NumFLUs) ->
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
-module(tango).
|
-module(tango).
|
||||||
|
|
||||||
-export([pack_v1/3, unpack_v1/2,
|
-export([pack_v1/3, unpack_v1/2,
|
||||||
|
add_back_pointer/2,
|
||||||
add_back_pointer/3,
|
add_back_pointer/3,
|
||||||
scan_backward/4,
|
scan_backward/4,
|
||||||
scan_backward/5,
|
scan_backward/5,
|
||||||
|
@ -30,6 +31,8 @@
|
||||||
|
|
||||||
-define(MAGIC_NUMBER_V1, 16#88990011).
|
-define(MAGIC_NUMBER_V1, 16#88990011).
|
||||||
|
|
||||||
|
-define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])).
|
||||||
|
|
||||||
%% TODO: for version 2: add strong checksum
|
%% TODO: for version 2: add strong checksum
|
||||||
|
|
||||||
pack_v1(StreamList, Page, PageSize) when is_list(StreamList), is_binary(Page) ->
|
pack_v1(StreamList, Page, PageSize) when is_list(StreamList), is_binary(Page) ->
|
||||||
|
@ -86,8 +89,11 @@ scan_backward2(_Proj, _Stream, LastLPN, StopAtLPN, _NumPages, _WithPagesP)
|
||||||
when LastLPN =< StopAtLPN; LastLPN =< 0 ->
|
when LastLPN =< StopAtLPN; LastLPN =< 0 ->
|
||||||
[];
|
[];
|
||||||
scan_backward2(Proj, Stream, LastLPN, StopAtLPN, NumPages, WithPagesP) ->
|
scan_backward2(Proj, Stream, LastLPN, StopAtLPN, NumPages, WithPagesP) ->
|
||||||
|
%% ?D({scan, lastlpn, LastLPN}),
|
||||||
case corfurl:read_page(Proj, LastLPN) of
|
case corfurl:read_page(Proj, LastLPN) of
|
||||||
{ok, FullPage} ->
|
{ok, FullPage} ->
|
||||||
|
%% ?D({scan, LastLPN, ok}),
|
||||||
|
%% ?D({scan, Stream, unpack_v1(FullPage, stream_list)}),
|
||||||
case proplists:get_value(Stream, unpack_v1(FullPage, stream_list)) of
|
case proplists:get_value(Stream, unpack_v1(FullPage, stream_list)) of
|
||||||
undefined ->
|
undefined ->
|
||||||
if NumPages == 0 ->
|
if NumPages == 0 ->
|
||||||
|
@ -110,6 +116,7 @@ scan_backward2(Proj, Stream, LastLPN, StopAtLPN, NumPages, WithPagesP) ->
|
||||||
end;
|
end;
|
||||||
BackPs ->
|
BackPs ->
|
||||||
if WithPagesP ->
|
if WithPagesP ->
|
||||||
|
%% ?D({bummer, BackPs}),
|
||||||
[{LastLPN, unpack_v1(FullPage, page)}|
|
[{LastLPN, unpack_v1(FullPage, page)}|
|
||||||
scan_backward2(Proj, Stream,
|
scan_backward2(Proj, Stream,
|
||||||
hd(BackPs), StopAtLPN, NumPages + 1,
|
hd(BackPs), StopAtLPN, NumPages + 1,
|
||||||
|
@ -126,6 +133,7 @@ scan_backward2(Proj, Stream, LastLPN, StopAtLPN, NumPages, WithPagesP) ->
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
Err ->
|
Err ->
|
||||||
|
%% ?D({scan, LastLPN, Err}),
|
||||||
Err
|
Err
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,8 @@
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/2, stop/1]).
|
-export([start_link/3, stop/1,
|
||||||
|
put/3, get/2]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
|
@ -35,29 +36,60 @@
|
||||||
-define(LONG_TIME, 5*1000).
|
-define(LONG_TIME, 5*1000).
|
||||||
%% -define(LONG_TIME, 30*1000).
|
%% -define(LONG_TIME, 30*1000).
|
||||||
|
|
||||||
|
-define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])).
|
||||||
|
|
||||||
|
-type lpn() :: non_neg_integer().
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
|
page_size :: non_neg_integer(),
|
||||||
seq :: pid(),
|
seq :: pid(),
|
||||||
proj :: term()
|
proj :: term(),
|
||||||
|
last_read_lpn :: lpn(),
|
||||||
|
last_write_lpn :: lpn(),
|
||||||
|
back_ps :: [lpn()],
|
||||||
|
i_state :: orddict:orddict()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
start_link(SequencerPid, Proj) ->
|
start_link(PageSize, SequencerPid, Proj) ->
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [SequencerPid, Proj], []).
|
gen_server:start_link(?MODULE, [PageSize, SequencerPid, Proj], []).
|
||||||
|
|
||||||
stop(Pid) ->
|
stop(Pid) ->
|
||||||
gen_server:call(Pid, {stop}, ?LONG_TIME).
|
gen_server:call(Pid, {stop}, ?LONG_TIME).
|
||||||
|
|
||||||
|
put(Pid, Key, Val) ->
|
||||||
|
gen_server:call(Pid, {put, Key, Val}, ?LONG_TIME).
|
||||||
|
|
||||||
|
get(Pid, Key) ->
|
||||||
|
gen_server:call(Pid, {get, Key}, ?LONG_TIME).
|
||||||
|
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
|
||||||
init([SequencerPid, Proj]) ->
|
init([PageSize, SequencerPid, Proj]) ->
|
||||||
{ok, CurrentLPN} = corfurl_sequencer:get(SequencerPid, 0),
|
LastLPN = find_last_lpn(SequencerPid),
|
||||||
LastLPN = CurrentLPN - 1,
|
{BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, 0),
|
||||||
io:format(user, "LastLPN = ~p\n", [LastLPN]),
|
Dict = play_log_pages(Pages, orddict:new()),
|
||||||
LPNandPages = tango:scan_backward(Proj, ?OID_STREAM_NUMBER, LastLPN, false),
|
{ok, #state{page_size=PageSize,
|
||||||
{_LPNS, Pages} = lists:unzip(LPNandPages),
|
seq=SequencerPid,
|
||||||
io:format("Pages = ~p\n", [Pages]),
|
proj=Proj,
|
||||||
{ok, #state{seq=SequencerPid,
|
last_read_lpn=LastLPN,
|
||||||
proj=Proj}}.
|
last_write_lpn=LastLPN,
|
||||||
|
back_ps=BackPs,
|
||||||
|
i_state=Dict}}.
|
||||||
|
|
||||||
|
handle_call({put, Key, Val}, _From,
|
||||||
|
#state{proj=Proj0, page_size=PageSize, back_ps=BackPs} = State) ->
|
||||||
|
Page = term_to_binary({put, Key, Val}),
|
||||||
|
FullPage = tango:pack_v1([{?OID_STREAM_NUMBER, BackPs}], Page, PageSize),
|
||||||
|
{{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage),
|
||||||
|
NewBackPs = tango:add_back_pointer(BackPs, LPN),
|
||||||
|
{reply, garrr, State#state{proj=Proj1,
|
||||||
|
last_write_lpn=LPN,
|
||||||
|
back_ps=NewBackPs}};
|
||||||
|
handle_call({get, Key}, _From, State) ->
|
||||||
|
{Pages, State2} = fetch_unread_pages(State),
|
||||||
|
%% ?D(Pages),
|
||||||
|
#state{i_state=Dict} = State3 = play_log_pages(Pages, State2),
|
||||||
|
{reply, orddict:find(Key, Dict), State3};
|
||||||
handle_call({stop}, _From, State) ->
|
handle_call({stop}, _From, State) ->
|
||||||
{stop, normal, ok, State};
|
{stop, normal, ok, State};
|
||||||
handle_call(_Request, _From, State) ->
|
handle_call(_Request, _From, State) ->
|
||||||
|
@ -78,3 +110,41 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
|
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
|
||||||
|
find_last_lpn(#state{seq=SequencerPid}) ->
|
||||||
|
find_last_lpn(SequencerPid);
|
||||||
|
find_last_lpn(SequencerPid) ->
|
||||||
|
{ok, CurrentLPN} = corfurl_sequencer:get(SequencerPid, 0),
|
||||||
|
CurrentLPN - 1.
|
||||||
|
|
||||||
|
fetch_unread_pages(#state{seq=___FIXME_SequencerPid, proj=Proj,
|
||||||
|
last_read_lpn=StopAtLPN,
|
||||||
|
last_write_lpn=LastLPN} = State) ->
|
||||||
|
%% TODO: fixme: to handle concurrent updates correctly, we should
|
||||||
|
%% query the sequencer for the last LPN.
|
||||||
|
{BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN),
|
||||||
|
%% TODO ????
|
||||||
|
%% LastReadLPN = if BackPs == [] -> 0;
|
||||||
|
%% true -> hd(BackPs)
|
||||||
|
%% end,
|
||||||
|
{Pages, State#state{last_read_lpn=LastLPN, back_ps=BackPs}}.
|
||||||
|
|
||||||
|
fetch_unread_pages(Proj, LastLPN, StopAtLPN) ->
|
||||||
|
%% ?D({fetch_unread_pages, LastLPN, StopAtLPN}),
|
||||||
|
LPNandPages = tango:scan_backward(Proj, ?OID_STREAM_NUMBER, LastLPN,
|
||||||
|
StopAtLPN, true),
|
||||||
|
{LPNs, Pages} = lists:unzip(LPNandPages),
|
||||||
|
BackPs = lists:foldl(fun(P, BPs) -> tango:add_back_pointer(BPs, P) end,
|
||||||
|
[], LPNs),
|
||||||
|
{BackPs, Pages}.
|
||||||
|
|
||||||
|
play_log_pages(Pages, #state{i_state=Dict} = State) ->
|
||||||
|
Dict2 = play_log_pages(Pages, Dict),
|
||||||
|
State#state{i_state=Dict2};
|
||||||
|
play_log_pages(Pages, Dict) ->
|
||||||
|
%% ?D({play_log_pages, Pages}),
|
||||||
|
lists:foldl(fun({put, K, V}=_Op, D) ->
|
||||||
|
%% ?D(_Op),
|
||||||
|
orddict:store(K, V, D)
|
||||||
|
end,
|
||||||
|
Dict,
|
||||||
|
[binary_to_term(Page) || Page <- Pages]).
|
||||||
|
|
|
@ -32,8 +32,30 @@ tango_oid_smoke_test() ->
|
||||||
ok = tango_test:run_test("/tmp", "tango_oid_smoke", 4096, 5*1024, 1,
|
ok = tango_test:run_test("/tmp", "tango_oid_smoke", 4096, 5*1024, 1,
|
||||||
fun tango_oid_smoke_test_int/3).
|
fun tango_oid_smoke_test_int/3).
|
||||||
|
|
||||||
tango_oid_smoke_test_int(_PageSize, Seq, Proj) ->
|
tango_oid_smoke_test_int(PageSize, Seq, Proj) ->
|
||||||
{ok, OID_Map} = tango_oid:start_link(Seq, Proj),
|
{ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj),
|
||||||
ok = tango_oid:stop(OID_Map),
|
|
||||||
|
|
||||||
|
ok = tango_oid:stop(OID_Map),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
tango_oid_one_test() ->
|
||||||
|
ok = tango_test:run_test("/tmp", "tango_oid_one", 4096, 5*1024, 1,
|
||||||
|
fun tango_oid_one_test_int/3).
|
||||||
|
|
||||||
|
tango_oid_one_test_int(PageSize, Seq, Proj) ->
|
||||||
|
{ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj),
|
||||||
|
|
||||||
|
try
|
||||||
|
K1 = foo,
|
||||||
|
V1 = bar1,
|
||||||
|
V2 = bar2,
|
||||||
|
garrr = tango_oid:put(OID_Map, K1, V1),
|
||||||
|
{ok, V1} = tango_oid:get(OID_Map, K1),
|
||||||
|
garrr = tango_oid:put(OID_Map, K1, V2),
|
||||||
|
{ok, V2} = tango_oid:get(OID_Map, K1),
|
||||||
|
error = tango_oid:get(OID_Map, "does not exist"),
|
||||||
|
|
||||||
|
ok
|
||||||
|
after
|
||||||
|
tango_oid:stop(OID_Map)
|
||||||
|
end.
|
||||||
|
|
Loading…
Reference in a new issue