Add missing func corfurl_client:append_page/3, then fix tango_dt_register_test
This commit is contained in:
parent
cdeddbb582
commit
c068057c96
4 changed files with 21 additions and 13 deletions
|
@ -20,7 +20,8 @@
|
||||||
|
|
||||||
-module(corfurl_client).
|
-module(corfurl_client).
|
||||||
|
|
||||||
-export([append_page/2, read_page/2, fill_page/2, trim_page/2, scan_forward/3]).
|
-export([append_page/2, append_page/3,
|
||||||
|
read_page/2, fill_page/2, trim_page/2, scan_forward/3]).
|
||||||
-export([restart_sequencer/1]).
|
-export([restart_sequencer/1]).
|
||||||
%% For debugging/verification only
|
%% For debugging/verification only
|
||||||
-export([pulse_tracing_start/1, pulse_tracing_add/2, pulse_tracing_get/1]).
|
-export([pulse_tracing_start/1, pulse_tracing_add/2, pulse_tracing_get/1]).
|
||||||
|
@ -31,33 +32,36 @@
|
||||||
%% -define(LONG_TIME, 30*1000).
|
%% -define(LONG_TIME, 30*1000).
|
||||||
|
|
||||||
append_page(Proj, Page) ->
|
append_page(Proj, Page) ->
|
||||||
append_page(Proj, Page, 5).
|
append_page(Proj, Page, []).
|
||||||
|
|
||||||
append_page(Proj, _Page, 0) ->
|
append_page(Proj, Page, StreamList) ->
|
||||||
|
append_page(Proj, Page, StreamList, 5).
|
||||||
|
|
||||||
|
append_page(Proj, _Page, _StreamList, 0) ->
|
||||||
{{error_failed, ?MODULE, ?LINE}, Proj};
|
{{error_failed, ?MODULE, ?LINE}, Proj};
|
||||||
append_page(#proj{seq={Sequencer,_,_}} = Proj, Page, Retries) ->
|
append_page(#proj{seq={Sequencer,_,_}} = Proj, Page, StreamList, Retries) ->
|
||||||
try
|
try
|
||||||
{ok, LPN} = corfurl_sequencer:get(Sequencer, 1),
|
{ok, LPN} = corfurl_sequencer:get(Sequencer, 1, StreamList),
|
||||||
pulse_tracing_add(write, LPN),
|
pulse_tracing_add(write, LPN),
|
||||||
append_page1(Proj, LPN, Page, 5)
|
append_page1(Proj, LPN, Page, StreamList, 5)
|
||||||
catch
|
catch
|
||||||
exit:{Reason,{_gen_server_or_pulse_gen_server,call,[Sequencer|_]}}
|
exit:{Reason,{_gen_server_or_pulse_gen_server,call,[Sequencer|_]}}
|
||||||
when Reason == noproc; Reason == normal ->
|
when Reason == noproc; Reason == normal ->
|
||||||
append_page(restart_sequencer(Proj), Page, Retries);
|
append_page(restart_sequencer(Proj), Page, StreamList, Retries);
|
||||||
exit:Exit ->
|
exit:Exit ->
|
||||||
{{error_failed, ?MODULE, ?LINE}, incomplete_code, Exit}
|
{{error_failed, ?MODULE, ?LINE}, incomplete_code, Exit}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
append_page1(Proj, _LPN, _Page, 0) ->
|
append_page1(Proj, _LPN, _Page, _StreamList, 0) ->
|
||||||
{{error_failed, ?MODULE, ?LINE}, Proj};
|
{{error_failed, ?MODULE, ?LINE}, Proj};
|
||||||
append_page1(Proj, LPN, Page, Retries) ->
|
append_page1(Proj, LPN, Page, StreamList, Retries) ->
|
||||||
case append_page2(Proj, LPN, Page) of
|
case append_page2(Proj, LPN, Page) of
|
||||||
lost_race ->
|
lost_race ->
|
||||||
append_page(Proj, Page, Retries - 1);
|
append_page(Proj, Page, StreamList, Retries - 1);
|
||||||
error_badepoch ->
|
error_badepoch ->
|
||||||
case poll_for_new_epoch_projection(Proj) of
|
case poll_for_new_epoch_projection(Proj) of
|
||||||
{ok, NewProj} ->
|
{ok, NewProj} ->
|
||||||
append_page1(NewProj, LPN, Page, Retries - 1);
|
append_page1(NewProj, LPN, Page, StreamList, Retries - 1);
|
||||||
Else ->
|
Else ->
|
||||||
{Else, Proj}
|
{Else, Proj}
|
||||||
end;
|
end;
|
||||||
|
|
|
@ -45,6 +45,8 @@
|
||||||
%% -define(LONG_TIME, 30*1000).
|
%% -define(LONG_TIME, 30*1000).
|
||||||
-define(LONG_TIME, 5*1000).
|
-define(LONG_TIME, 5*1000).
|
||||||
|
|
||||||
|
-define(D(X), io:format(user, "Dbg: ~s =\n ~p\n", [??X, X])).
|
||||||
|
|
||||||
start_link(FLUs) ->
|
start_link(FLUs) ->
|
||||||
start_link(FLUs, standard).
|
start_link(FLUs, standard).
|
||||||
|
|
||||||
|
@ -124,7 +126,7 @@ handle_call({get, NumPages, StreamList, LC}, _From,
|
||||||
{Tab, MLP + NumPages, BadPercent, MaxDifference}};
|
{Tab, MLP + NumPages, BadPercent, MaxDifference}};
|
||||||
handle_call({get_tails, StreamList, LC}, _From, MLP_tuple) ->
|
handle_call({get_tails, StreamList, LC}, _From, MLP_tuple) ->
|
||||||
Tab = element(1, MLP_tuple),
|
Tab = element(1, MLP_tuple),
|
||||||
Tails = [ets:lookup_element(Tab, Stream, 2) || Stream <- StreamList],
|
Tails = [(catch ets:lookup_element(Tab, Stream, 2)) || Stream <- StreamList],
|
||||||
NewLC = lclock_update(LC),
|
NewLC = lclock_update(LC),
|
||||||
{reply, {{ok, Tails}, NewLC}, MLP_tuple};
|
{reply, {{ok, Tails}, NewLC}, MLP_tuple};
|
||||||
handle_call({set_tails, StreamTails}, _From, MLP_tuple) ->
|
handle_call({set_tails, StreamTails}, _From, MLP_tuple) ->
|
||||||
|
|
|
@ -183,7 +183,8 @@ do_dirty_op({new_oid, _Key, _From}=Op,
|
||||||
I_State, StreamNum, Proj0, PageSize, BackPs) ->
|
I_State, StreamNum, Proj0, PageSize, BackPs) ->
|
||||||
Page = term_to_binary(Op),
|
Page = term_to_binary(Op),
|
||||||
FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize),
|
FullPage = tango:pack_v1([{StreamNum, BackPs}], Page, PageSize),
|
||||||
{{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage),
|
{{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage,
|
||||||
|
[StreamNum]),
|
||||||
NewBackPs = tango:add_back_pointer(BackPs, LPN),
|
NewBackPs = tango:add_back_pointer(BackPs, LPN),
|
||||||
{ok, I_State, Proj1, LPN, NewBackPs}.
|
{ok, I_State, Proj1, LPN, NewBackPs}.
|
||||||
|
|
||||||
|
|
|
@ -189,6 +189,7 @@ tango_dt_register_int(PageSize, Seq, Proj) ->
|
||||||
NewVal = {"Heh", "a new value"},
|
NewVal = {"Heh", "a new value"},
|
||||||
ok = tango_dt_register:set(Reg2, NewVal),
|
ok = tango_dt_register:set(Reg2, NewVal),
|
||||||
{ok, NewVal} = tango_dt_register:get(Reg2b),
|
{ok, NewVal} = tango_dt_register:get(Reg2b),
|
||||||
|
{ok, NewVal} = tango_dt_register:get(Reg2), % sanity check
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue