WIP: tango_oid_test now passes
This commit is contained in:
parent
a0bb7ee23d
commit
0b3bb3ee7c
4 changed files with 5 additions and 23 deletions
|
@ -123,17 +123,17 @@ handle_call({get, NumPages, LC}, _From,
|
||||||
handle_call({get_tails, NumPages, StreamList, LC}, _From, MLP_tuple) ->
|
handle_call({get_tails, NumPages, StreamList, LC}, _From, MLP_tuple) ->
|
||||||
Tab = element(1, MLP_tuple),
|
Tab = element(1, MLP_tuple),
|
||||||
MLP = element(2, MLP_tuple),
|
MLP = element(2, MLP_tuple),
|
||||||
if NumPages > 0 ->
|
|
||||||
update_stream_tails(Tab, StreamList, MLP);
|
|
||||||
true ->
|
|
||||||
ok
|
|
||||||
end,
|
|
||||||
Tails = [case (catch ets:lookup_element(Tab, Stream, 2)) of
|
Tails = [case (catch ets:lookup_element(Tab, Stream, 2)) of
|
||||||
{'EXIT', _} ->
|
{'EXIT', _} ->
|
||||||
[];
|
[];
|
||||||
Res ->
|
Res ->
|
||||||
Res
|
Res
|
||||||
end || Stream <- StreamList],
|
end || Stream <- StreamList],
|
||||||
|
if NumPages > 0 ->
|
||||||
|
update_stream_tails(Tab, StreamList, MLP);
|
||||||
|
true ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
NewLC = lclock_update(LC),
|
NewLC = lclock_update(LC),
|
||||||
{reply, {{ok, MLP, Tails}, NewLC},
|
{reply, {{ok, MLP, Tails}, NewLC},
|
||||||
setelement(2, MLP_tuple, MLP + NumPages)};
|
setelement(2, MLP_tuple, MLP + NumPages)};
|
||||||
|
|
|
@ -102,12 +102,8 @@ scan_backward2(_Proj, _Stream, LastLPN, StopAtLPN, _NumPages, _WithPagesP)
|
||||||
when LastLPN =< StopAtLPN; LastLPN =< 0 ->
|
when LastLPN =< StopAtLPN; LastLPN =< 0 ->
|
||||||
[];
|
[];
|
||||||
scan_backward2(Proj, Stream, LastLPN, StopAtLPN, NumPages, WithPagesP) ->
|
scan_backward2(Proj, Stream, LastLPN, StopAtLPN, NumPages, WithPagesP) ->
|
||||||
?D({scan, Stream, LastLPN}),
|
|
||||||
case corfurl:read_page(Proj, LastLPN) of
|
case corfurl:read_page(Proj, LastLPN) of
|
||||||
{ok, FullPage} ->
|
{ok, FullPage} ->
|
||||||
?D({scan, LastLPN, ok}),
|
|
||||||
?D({scan, Stream, unpack_v1(FullPage, stream_list)}),
|
|
||||||
?D(proplists:get_value(Stream, unpack_v1(FullPage, stream_list))),
|
|
||||||
case proplists:get_value(Stream, unpack_v1(FullPage, stream_list)) of
|
case proplists:get_value(Stream, unpack_v1(FullPage, stream_list)) of
|
||||||
undefined ->
|
undefined ->
|
||||||
if NumPages == 0 ->
|
if NumPages == 0 ->
|
||||||
|
@ -124,14 +120,11 @@ scan_backward2(Proj, Stream, LastLPN, StopAtLPN, NumPages, WithPagesP) ->
|
||||||
end;
|
end;
|
||||||
[] ->
|
[] ->
|
||||||
if WithPagesP ->
|
if WithPagesP ->
|
||||||
?D(?LINE),
|
|
||||||
[{LastLPN, unpack_v1(FullPage, page)}];
|
[{LastLPN, unpack_v1(FullPage, page)}];
|
||||||
true ->
|
true ->
|
||||||
?D(?LINE),
|
|
||||||
[LastLPN]
|
[LastLPN]
|
||||||
end;
|
end;
|
||||||
BackPs ->
|
BackPs ->
|
||||||
?D(?LINE),
|
|
||||||
if WithPagesP ->
|
if WithPagesP ->
|
||||||
%% ?D({bummer, BackPs}),
|
%% ?D({bummer, BackPs}),
|
||||||
[{LastLPN, unpack_v1(FullPage, page)}|
|
[{LastLPN, unpack_v1(FullPage, page)}|
|
||||||
|
|
|
@ -143,7 +143,6 @@ fetch_unread_pages(Proj, LastLPN, StopAtLPN)
|
||||||
%% ?D({fetch_unread_pages, LastLPN, StopAtLPN}),
|
%% ?D({fetch_unread_pages, LastLPN, StopAtLPN}),
|
||||||
LPNandPages = tango:scan_backward(Proj, ?OID_STREAM_NUMBER, LastLPN,
|
LPNandPages = tango:scan_backward(Proj, ?OID_STREAM_NUMBER, LastLPN,
|
||||||
StopAtLPN, true),
|
StopAtLPN, true),
|
||||||
?D(LPNandPages),
|
|
||||||
{_LPNs, _Pages} = lists:unzip(LPNandPages).
|
{_LPNs, _Pages} = lists:unzip(LPNandPages).
|
||||||
|
|
||||||
play_log_pages(Pages, SideEffectsP,
|
play_log_pages(Pages, SideEffectsP,
|
||||||
|
@ -156,11 +155,8 @@ play_log_pages(Pages, I_State, CallbackMod, SideEffectsP) ->
|
||||||
|
|
||||||
roll_log_forward(#state{seq=SequencerPid, proj=Proj, all_back_ps=BackPs,
|
roll_log_forward(#state{seq=SequencerPid, proj=Proj, all_back_ps=BackPs,
|
||||||
last_fetch_lpn=StopAtLPN} = State) ->
|
last_fetch_lpn=StopAtLPN} = State) ->
|
||||||
?D(StopAtLPN),
|
|
||||||
LastLPN = find_last_lpn(SequencerPid),
|
LastLPN = find_last_lpn(SequencerPid),
|
||||||
?D(LastLPN),
|
|
||||||
{LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN),
|
{LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN),
|
||||||
?D(LPNs),
|
|
||||||
NewBPs = append_lpns(LPNs, BackPs),
|
NewBPs = append_lpns(LPNs, BackPs),
|
||||||
play_log_pages(Pages, true, State#state{all_back_ps=NewBPs}).
|
play_log_pages(Pages, true, State#state{all_back_ps=NewBPs}).
|
||||||
|
|
||||||
|
@ -197,19 +193,15 @@ play_log_mutate_i_state(Pages, SideEffectsP, I_State) ->
|
||||||
{Res, O2} =
|
{Res, O2} =
|
||||||
case ?DICTMOD:find(Key, Dict) of
|
case ?DICTMOD:find(Key, Dict) of
|
||||||
error ->
|
error ->
|
||||||
?D(?LINE),
|
|
||||||
Dict2 = ?DICTMOD:store(Key, Next, Dict),
|
Dict2 = ?DICTMOD:store(Key, Next, Dict),
|
||||||
{{ok, Next},O#oid_map{map=Dict2,
|
{{ok, Next},O#oid_map{map=Dict2,
|
||||||
next=Next + 1}};
|
next=Next + 1}};
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
?D(?LINE),
|
|
||||||
{already_exists, O}
|
{already_exists, O}
|
||||||
end,
|
end,
|
||||||
if SideEffectsP ->
|
if SideEffectsP ->
|
||||||
?D(?LINE),
|
|
||||||
gen_server:reply(From, Res);
|
gen_server:reply(From, Res);
|
||||||
true ->
|
true ->
|
||||||
?D(?LINE),
|
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
O2
|
O2
|
||||||
|
|
|
@ -56,11 +56,8 @@ tango_oid_one_test_int(PageSize, Seq, Proj) ->
|
||||||
OID_Num1 = 1,
|
OID_Num1 = 1,
|
||||||
error = tango_oid:get(OID_Map, "does not exist"),
|
error = tango_oid:get(OID_Map, "does not exist"),
|
||||||
|
|
||||||
?D(?LINE),
|
|
||||||
{ok, OID_Num1} = tango_oid:new(OID_Map, K1),
|
{ok, OID_Num1} = tango_oid:new(OID_Map, K1),
|
||||||
?D(?LINE),
|
|
||||||
{ok, OID_Num1} = tango_oid:get(OID_Map, K1),
|
{ok, OID_Num1} = tango_oid:get(OID_Map, K1),
|
||||||
?D(?LINE),
|
|
||||||
already_exists = tango_oid:new(OID_Map, K1),
|
already_exists = tango_oid:new(OID_Map, K1),
|
||||||
%% The V2 put should *not* have clobbered the previous value
|
%% The V2 put should *not* have clobbered the previous value
|
||||||
{ok, OID_Num1} = tango_oid:get(OID_Map, K1),
|
{ok, OID_Num1} = tango_oid:get(OID_Map, K1),
|
||||||
|
|
Loading…
Reference in a new issue