WIP: tango_oid refactoring, all broken: infinite loop
This commit is contained in:
parent
9a3ac02413
commit
a0bb7ee23d
3 changed files with 33 additions and 13 deletions
|
@ -38,28 +38,28 @@
|
|||
|
||||
%% TODO: for version 2: add strong checksum
|
||||
|
||||
pack_v1(StreamList, Options, Page, PageSize)
|
||||
when is_list(StreamList), is_list(Options), is_binary(Page),
|
||||
pack_v1(Stream_BackPs, Options, Page, PageSize)
|
||||
when is_list(Stream_BackPs), is_list(Options), is_binary(Page),
|
||||
is_integer(PageSize), PageSize > 0 ->
|
||||
StreamListBin = term_to_binary(StreamList),
|
||||
StreamListSize = byte_size(StreamListBin),
|
||||
Stream_BackPsBin = term_to_binary(Stream_BackPs),
|
||||
Stream_BackPsSize = byte_size(Stream_BackPsBin),
|
||||
OptionsInt = convert_options_list2int(Options),
|
||||
PageActualSize = byte_size(Page),
|
||||
pad_bin(PageSize,
|
||||
list_to_binary([<<?MAGIC_NUMBER_V1:32/big>>,
|
||||
<<OptionsInt:8/big>>,
|
||||
<<StreamListSize:16/big>>,
|
||||
StreamListBin,
|
||||
<<Stream_BackPsSize:16/big>>,
|
||||
Stream_BackPsBin,
|
||||
<<PageActualSize:16/big>>,
|
||||
Page])).
|
||||
|
||||
unpack_v1(<<?MAGIC_NUMBER_V1:32/big,
|
||||
_Options:8/big,
|
||||
StreamListSize:16/big, StreamListBin:StreamListSize/binary,
|
||||
Stream_BackPsSize:16/big, Stream_BackPsBin:Stream_BackPsSize/binary,
|
||||
PageActualSize:16/big, Page:PageActualSize/binary,
|
||||
_/binary>>, Part) ->
|
||||
if Part == stream_list ->
|
||||
binary_to_term(StreamListBin);
|
||||
binary_to_term(Stream_BackPsBin);
|
||||
Part == page ->
|
||||
Page
|
||||
end.
|
||||
|
@ -102,11 +102,12 @@ scan_backward2(_Proj, _Stream, LastLPN, StopAtLPN, _NumPages, _WithPagesP)
|
|||
when LastLPN =< StopAtLPN; LastLPN =< 0 ->
|
||||
[];
|
||||
scan_backward2(Proj, Stream, LastLPN, StopAtLPN, NumPages, WithPagesP) ->
|
||||
%% ?D({scan, lastlpn, LastLPN}),
|
||||
?D({scan, Stream, LastLPN}),
|
||||
case corfurl:read_page(Proj, LastLPN) of
|
||||
{ok, FullPage} ->
|
||||
%% ?D({scan, LastLPN, ok}),
|
||||
%% ?D({scan, Stream, unpack_v1(FullPage, stream_list)}),
|
||||
?D({scan, LastLPN, ok}),
|
||||
?D({scan, Stream, unpack_v1(FullPage, stream_list)}),
|
||||
?D(proplists:get_value(Stream, unpack_v1(FullPage, stream_list))),
|
||||
case proplists:get_value(Stream, unpack_v1(FullPage, stream_list)) of
|
||||
undefined ->
|
||||
if NumPages == 0 ->
|
||||
|
@ -123,11 +124,14 @@ scan_backward2(Proj, Stream, LastLPN, StopAtLPN, NumPages, WithPagesP) ->
|
|||
end;
|
||||
[] ->
|
||||
if WithPagesP ->
|
||||
?D(?LINE),
|
||||
[{LastLPN, unpack_v1(FullPage, page)}];
|
||||
true ->
|
||||
?D(?LINE),
|
||||
[LastLPN]
|
||||
end;
|
||||
BackPs ->
|
||||
?D(?LINE),
|
||||
if WithPagesP ->
|
||||
%% ?D({bummer, BackPs}),
|
||||
[{LastLPN, unpack_v1(FullPage, page)}|
|
||||
|
@ -163,7 +167,8 @@ append_page(#proj{seq={Sequencer,_,_}, page_size=PageSize} = Proj,
|
|||
{ok, LPN, BackPsList} = corfurl_sequencer:get_tails(Sequencer, 1,
|
||||
StreamList),
|
||||
%% pulse_tracing_add(write, LPN),
|
||||
Page = tango:pack_v1(StreamList, [to_final_page],
|
||||
StreamBackPs = lists:zip(StreamList, BackPsList),
|
||||
Page = tango:pack_v1(StreamBackPs, [to_final_page],
|
||||
OrigPage, PageSize),
|
||||
append_page1(Proj, LPN, Page, StreamList, 5, OrigPage)
|
||||
catch
|
||||
|
|
|
@ -39,7 +39,7 @@
|
|||
|
||||
-define(LONG_TIME, 30*1000).
|
||||
|
||||
-define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])).
|
||||
-define(D(X), io:format(user, "Dbg: ~s = ~p\n", [??X, X])).
|
||||
|
||||
-type lpn() :: non_neg_integer().
|
||||
|
||||
|
@ -143,6 +143,7 @@ fetch_unread_pages(Proj, LastLPN, StopAtLPN)
|
|||
%% ?D({fetch_unread_pages, LastLPN, StopAtLPN}),
|
||||
LPNandPages = tango:scan_backward(Proj, ?OID_STREAM_NUMBER, LastLPN,
|
||||
StopAtLPN, true),
|
||||
?D(LPNandPages),
|
||||
{_LPNs, _Pages} = lists:unzip(LPNandPages).
|
||||
|
||||
play_log_pages(Pages, SideEffectsP,
|
||||
|
@ -155,8 +156,11 @@ play_log_pages(Pages, I_State, CallbackMod, SideEffectsP) ->
|
|||
|
||||
roll_log_forward(#state{seq=SequencerPid, proj=Proj, all_back_ps=BackPs,
|
||||
last_fetch_lpn=StopAtLPN} = State) ->
|
||||
?D(StopAtLPN),
|
||||
LastLPN = find_last_lpn(SequencerPid),
|
||||
?D(LastLPN),
|
||||
{LPNs, Pages} = fetch_unread_pages(Proj, LastLPN, StopAtLPN),
|
||||
?D(LPNs),
|
||||
NewBPs = append_lpns(LPNs, BackPs),
|
||||
play_log_pages(Pages, true, State#state{all_back_ps=NewBPs}).
|
||||
|
||||
|
@ -193,15 +197,19 @@ play_log_mutate_i_state(Pages, SideEffectsP, I_State) ->
|
|||
{Res, O2} =
|
||||
case ?DICTMOD:find(Key, Dict) of
|
||||
error ->
|
||||
?D(?LINE),
|
||||
Dict2 = ?DICTMOD:store(Key, Next, Dict),
|
||||
{{ok, Next},O#oid_map{map=Dict2,
|
||||
next=Next + 1}};
|
||||
{ok, _} ->
|
||||
?D(?LINE),
|
||||
{already_exists, O}
|
||||
end,
|
||||
if SideEffectsP ->
|
||||
?D(?LINE),
|
||||
gen_server:reply(From, Res);
|
||||
true ->
|
||||
?D(?LINE),
|
||||
ok
|
||||
end,
|
||||
O2
|
||||
|
|
|
@ -28,6 +28,8 @@
|
|||
-endif.
|
||||
-endif.
|
||||
|
||||
-define(D(X), io:format(user, "Dbg: ~s = ~p\n", [??X, X])).
|
||||
|
||||
-ifdef(TEST).
|
||||
-ifndef(PULSE).
|
||||
|
||||
|
@ -52,8 +54,13 @@ tango_oid_one_test_int(PageSize, Seq, Proj) ->
|
|||
K1 = foo,
|
||||
K2 = bar,
|
||||
OID_Num1 = 1,
|
||||
error = tango_oid:get(OID_Map, "does not exist"),
|
||||
|
||||
?D(?LINE),
|
||||
{ok, OID_Num1} = tango_oid:new(OID_Map, K1),
|
||||
?D(?LINE),
|
||||
{ok, OID_Num1} = tango_oid:get(OID_Map, K1),
|
||||
?D(?LINE),
|
||||
already_exists = tango_oid:new(OID_Map, K1),
|
||||
%% The V2 put should *not* have clobbered the previous value
|
||||
{ok, OID_Num1} = tango_oid:get(OID_Map, K1),
|
||||
|
|
Loading…
Reference in a new issue