Put the sequencer pid inside the projection

This commit is contained in:
Scott Lystig Fritchie 2014-02-26 16:59:28 +09:00
parent a64a09338d
commit d5091358ff
5 changed files with 47 additions and 31 deletions

View file

@ -22,6 +22,8 @@
-type flu() :: pid() | flu_name().
-type flu_chain() :: [flu()].
-type seq_name() :: {'undefined' | pid(), atom(), atom()}.
-record(range, {
pn_start :: non_neg_integer(), % start page number
pn_end :: non_neg_integer(), % end page number
@ -30,6 +32,7 @@
-record(proj, { % Projection
epoch :: non_neg_integer(),
seq :: 'undefined' | seq_name(),
r :: [#range{}]
}).

View file

@ -24,7 +24,7 @@
new_range/3,
read_projection/2,
save_projection/2]).
-export([append_page/3, read_page/2, scan_forward/3,
-export([append_page/2, read_page/2, scan_forward/3,
fill_page/2, trim_page/2]).
-include("corfurl.hrl").
@ -40,10 +40,17 @@
-define(EVENT_LOG(X), ok).
%%% -define(EVENT_LOG(X), event_logger:event(X)).
append_page(Sequencer, P, Page) ->
append_page(Sequencer, P, Page, 1).
append_page(P, Page) ->
append_page(P, Page, 1).
append_page(Sequencer, P, Page, Retries) when Retries < 50 ->
append_page(#proj{seq={undefined, SeqHost, SeqName}} = P, Page, Retries) ->
case rpc:call(SeqHost, erlang, whereis, [SeqName]) of
SeqPid when is_pid(SeqPid) ->
append_page(P#proj{seq={SeqPid, SeqHost, SeqName}}, Page, Retries);
Else ->
exit({bummer, mod, ?MODULE, line, ?LINE, error, Else})
end;
append_page(#proj{seq={Sequencer,_,_}} = P, Page, Retries) when Retries < 50 ->
case corfurl_sequencer:get(Sequencer, 1) of
LPN when is_integer(LPN) ->
case write_single_page(P, LPN, Page) of
@ -51,7 +58,7 @@ append_page(Sequencer, P, Page, Retries) when Retries < 50 ->
{ok, LPN};
X when X == error_overwritten; X == error_trimmed ->
report_lost_race(LPN, X),
append_page(Sequencer, P, Page);
append_page(P, Page);
{special_trimmed, LPN}=XX ->
XX;
Else ->
@ -59,7 +66,7 @@ append_page(Sequencer, P, Page, Retries) when Retries < 50 ->
end;
_ ->
timer:sleep(Retries), % TODO naive
append_page(Sequencer, P, Page, Retries * 2)
append_page(P, Page, Retries * 2)
end.
write_single_page(#proj{epoch=Epoch} = P, LPN, Page) ->

View file

@ -45,7 +45,10 @@ start_link(FLUs) ->
start_link(FLUs, standard).
start_link(FLUs, SeqType) ->
gen_server:start_link(?MODULE, {FLUs, SeqType}, []).
start_link(FLUs, SeqType, ?SERVER).
start_link(FLUs, SeqType, RegName) ->
gen_server:start_link({local, RegName}, ?MODULE, {FLUs, SeqType}, []).
stop(Pid) ->
gen_server:call(Pid, stop, infinity).

View file

@ -50,7 +50,6 @@
-define(MY_KEY, ?MY_TAB).
-record(run, {
seq, % Sequencer
proj, % Projection
flus % List of FLUs
}).
@ -607,11 +606,12 @@ zipwith(F, [X|Xs], [Y|Ys]) ->
[F(X, Y)|zipwith(F, Xs, Ys)];
zipwith(_, _, _) -> [].
clean_up_runtime(R) ->
clean_up_runtime(#run{flus=Flus, proj=P}) ->
%% io:format(user, "clean_up_runtime: run = ~p\n", [R]),
catch corfurl_sequencer:stop(R#run.seq),
[catch corfurl_flu:stop(F) || F <- R#run.flus],
corfurl_test:setup_del_all(length(R#run.flus)).
#proj{seq={Seq,_,_}} = P,
catch corfurl_sequencer:stop(Seq),
[catch corfurl_flu:stop(F) || F <- Flus],
corfurl_test:setup_del_all(length(Flus)).
make_chains(ChainLen, FLUs) ->
make_chains(ChainLen, FLUs, [], []).
@ -633,7 +633,8 @@ setup(NumChains, ChainLen, PageSize, SeqType) ->
Chains = make_chains(ChainLen, FLUs),
%% io:format(user, "Cs = ~p\n", [Chains]),
Proj = corfurl:new_simple_projection(1, 1, ?MAX_PAGES, Chains),
Run = #run{seq=Seq, proj=Proj, flus=FLUs},
Run = #run{proj=Proj#proj{seq={Seq, node(), 'corfurl pulse seq thingie'}},
flus=FLUs},
ets:insert(?MY_TAB, {?MY_KEY, Run}),
Run.
@ -688,7 +689,7 @@ log_make_result(Result) ->
log_make_result(Pid, Result) ->
{result, Pid, Result}.
pick_an_LPN(Seq, SeedInt) ->
pick_an_LPN(#proj{seq={Seq,_,_}}, SeedInt) ->
Max = corfurl_sequencer:get(Seq, 0),
%% The sequencer may be lying to us, shouganai.
if SeedInt > Max -> (SeedInt rem Max) + 1;
@ -701,12 +702,12 @@ pick_an_LPN(Seq, SeedInt) ->
event_logger:event(log_make_result(LOG__Result), lamport_clock:get()),
LOG__Result).
append(#run{seq=Seq, proj=Proj}, Page) ->
append(#run{proj=Proj}, Page) ->
lamport_clock:init(),
lamport_clock:incr(),
?LOG({append, Page},
begin
Res = corfurl:append_page(Seq, Proj, Page),
Res = corfurl:append_page(Proj, Page),
perhaps_trip_append_page(?TRIP_no_append_duplicates, Res, Page)
end).
@ -715,21 +716,21 @@ read_result_mangle({ok, Page}) ->
read_result_mangle(Else) ->
Else.
read_approx(#run{seq=Seq, proj=Proj}, SeedInt) ->
read_approx(#run{proj=Proj}, SeedInt) ->
lamport_clock:init(),
lamport_clock:incr(),
LPN = pick_an_LPN(Seq, SeedInt),
LPN = pick_an_LPN(Proj, SeedInt),
?LOG({read, LPN},
begin
Res = read_result_mangle(corfurl:read_page(Proj, LPN)),
perhaps_trip_read_approx(?TRIP_bad_read, Res, LPN)
end).
scan_forward(#run{seq=Seq, proj=Proj}, SeedInt, NumPages) ->
scan_forward(#run{proj=Proj}, SeedInt, NumPages) ->
lamport_clock:init(),
lamport_clock:incr(),
StartLPN = if SeedInt == 1 -> 1;
true -> pick_an_LPN(Seq, SeedInt)
true -> pick_an_LPN(Proj, SeedInt)
end,
%% Our job is complicated by the ?LOG() macro, which isn't good enough
%% for our purpose: we must lie about the starting timestamp, to make
@ -757,20 +758,20 @@ scan_forward(#run{seq=Seq, proj=Proj}, SeedInt, NumPages) ->
end
end).
fill(#run{seq=Seq, proj=Proj}, SeedInt) ->
fill(#run{proj=Proj}, SeedInt) ->
lamport_clock:init(),
lamport_clock:incr(),
LPN = pick_an_LPN(Seq, SeedInt),
LPN = pick_an_LPN(Proj, SeedInt),
?LOG({fill, LPN},
begin
Res = corfurl:fill_page(Proj, LPN),
perhaps_trip_fill_page(?TRIP_bad_fill, Res, LPN)
end).
trim(#run{seq=Seq, proj=Proj}, SeedInt) ->
trim(#run{proj=Proj}, SeedInt) ->
lamport_clock:init(),
lamport_clock:incr(),
LPN = pick_an_LPN(Seq, SeedInt),
LPN = pick_an_LPN(Proj, SeedInt),
?LOG({trim, LPN},
begin
Res = corfurl:trim_page(Proj, LPN),

View file

@ -83,8 +83,9 @@ smoke1_test() ->
lists:flatten(io_lib:format("~8..0w", [X])))} ||
X <- lists:seq(1, 5)],
try
P1 = ?M:new_simple_projection(1, 1, 1*100, [[F1, F2, F3], [F4, F5, F6]]),
[begin {ok, LPN} = ?M:append_page(Seq, P1, Pg) end || {LPN, Pg} <- LPN_Pgs],
P0 = ?M:new_simple_projection(1, 1, 1*100, [[F1, F2, F3], [F4, F5, F6]]),
P1 = P0#proj{seq={Seq, unused, unused}},
[begin {ok, LPN} = ?M:append_page(P1, Pg) end || {LPN, Pg} <- LPN_Pgs],
[begin {ok, Pg} = ?M:read_page(P1, LPN) end || {LPN, Pg} <- LPN_Pgs],
@ -153,11 +154,11 @@ forfun_test_() ->
[forfun(Procs) || Procs <- [10,100,1000,5000]]
end}.
forfun_append(0, _Seq, _P, _Page) ->
forfun_append(0, _P, _Page) ->
ok;
forfun_append(N, Seq, P, Page) ->
forfun_append(N, #proj{seq={Seq, _, _}} = P, Page) ->
{ok, _} = ?M:append_page(Seq, P, Page),
forfun_append(N - 1, Seq, P, Page).
forfun_append(N - 1, P, Page).
%%% My MBP, SSD
%%% The 1K and 5K procs shows full-mailbox-scan ickiness
@ -191,13 +192,14 @@ forfun(NumProcs) ->
try
Chains = [[F1, F2], [F3, F4]],
%%Chains = [[F1], [F2], [F3], [F4]],
P = ?M:new_simple_projection(1, 1, NumPages*2, Chains),
P0 = ?M:new_simple_projection(1, 1, NumPages*2, Chains),
P = P0#proj{seq={Seq, unused, unused}},
Me = self(),
Start = now(),
Ws = [begin
Page = <<X:(PageSize*8)>>,
spawn_link(fun() ->
forfun_append(PagesPerProc, Seq, P, Page),
forfun_append(PagesPerProc, P, Page),
Me ! {done, self()}
end)
end || X <- lists:seq(1, NumProcs)],