diff --git a/prototype/corfurl/include/corfurl.hrl b/prototype/corfurl/include/corfurl.hrl index e3b2b28..8bb452e 100644 --- a/prototype/corfurl/include/corfurl.hrl +++ b/prototype/corfurl/include/corfurl.hrl @@ -22,6 +22,8 @@ -type flu() :: pid() | flu_name(). -type flu_chain() :: [flu()]. +-type seq_name() :: {'undefined' | pid(), atom(), atom()}. + -record(range, { pn_start :: non_neg_integer(), % start page number pn_end :: non_neg_integer(), % end page number @@ -30,6 +32,7 @@ -record(proj, { % Projection epoch :: non_neg_integer(), + seq :: 'undefined' | seq_name(), r :: [#range{}] }). diff --git a/prototype/corfurl/src/corfurl.erl b/prototype/corfurl/src/corfurl.erl index 38332a8..7ccc9c0 100644 --- a/prototype/corfurl/src/corfurl.erl +++ b/prototype/corfurl/src/corfurl.erl @@ -24,7 +24,7 @@ new_range/3, read_projection/2, save_projection/2]). --export([append_page/3, read_page/2, scan_forward/3, +-export([append_page/2, read_page/2, scan_forward/3, fill_page/2, trim_page/2]). -include("corfurl.hrl"). @@ -40,10 +40,17 @@ -define(EVENT_LOG(X), ok). %%% -define(EVENT_LOG(X), event_logger:event(X)). -append_page(Sequencer, P, Page) -> - append_page(Sequencer, P, Page, 1). +append_page(P, Page) -> + append_page(P, Page, 1). -append_page(Sequencer, P, Page, Retries) when Retries < 50 -> +append_page(#proj{seq={undefined, SeqHost, SeqName}} = P, Page, Retries) -> + case rpc:call(SeqHost, erlang, whereis, [SeqName]) of + SeqPid when is_pid(SeqPid) -> + append_page(P#proj{seq={SeqPid, SeqHost, SeqName}}, Page, Retries); + Else -> + exit({bummer, mod, ?MODULE, line, ?LINE, error, Else}) + end; +append_page(#proj{seq={Sequencer,_,_}} = P, Page, Retries) when Retries < 50 -> case corfurl_sequencer:get(Sequencer, 1) of LPN when is_integer(LPN) -> case write_single_page(P, LPN, Page) of @@ -51,7 +58,7 @@ append_page(Sequencer, P, Page, Retries) when Retries < 50 -> {ok, LPN}; X when X == error_overwritten; X == error_trimmed -> report_lost_race(LPN, X), - append_page(Sequencer, P, Page); + append_page(P, Page); {special_trimmed, LPN}=XX -> XX; Else -> @@ -59,7 +66,7 @@ append_page(Sequencer, P, Page, Retries) when Retries < 50 -> end; _ -> timer:sleep(Retries), % TODO naive - append_page(Sequencer, P, Page, Retries * 2) + append_page(P, Page, Retries * 2) end. write_single_page(#proj{epoch=Epoch} = P, LPN, Page) -> diff --git a/prototype/corfurl/src/corfurl_sequencer.erl b/prototype/corfurl/src/corfurl_sequencer.erl index f600713..8a0240f 100644 --- a/prototype/corfurl/src/corfurl_sequencer.erl +++ b/prototype/corfurl/src/corfurl_sequencer.erl @@ -45,7 +45,10 @@ start_link(FLUs) -> start_link(FLUs, standard). start_link(FLUs, SeqType) -> - gen_server:start_link(?MODULE, {FLUs, SeqType}, []). + start_link(FLUs, SeqType, ?SERVER). + +start_link(FLUs, SeqType, RegName) -> + gen_server:start_link({local, RegName}, ?MODULE, {FLUs, SeqType}, []). stop(Pid) -> gen_server:call(Pid, stop, infinity). diff --git a/prototype/corfurl/test/corfurl_pulse.erl b/prototype/corfurl/test/corfurl_pulse.erl index 72eb6bd..fc74e7b 100644 --- a/prototype/corfurl/test/corfurl_pulse.erl +++ b/prototype/corfurl/test/corfurl_pulse.erl @@ -50,7 +50,6 @@ -define(MY_KEY, ?MY_TAB). -record(run, { - seq, % Sequencer proj, % Projection flus % List of FLUs }). @@ -607,11 +606,12 @@ zipwith(F, [X|Xs], [Y|Ys]) -> [F(X, Y)|zipwith(F, Xs, Ys)]; zipwith(_, _, _) -> []. -clean_up_runtime(R) -> +clean_up_runtime(#run{flus=Flus, proj=P}) -> %% io:format(user, "clean_up_runtime: run = ~p\n", [R]), - catch corfurl_sequencer:stop(R#run.seq), - [catch corfurl_flu:stop(F) || F <- R#run.flus], - corfurl_test:setup_del_all(length(R#run.flus)). + #proj{seq={Seq,_,_}} = P, + catch corfurl_sequencer:stop(Seq), + [catch corfurl_flu:stop(F) || F <- Flus], + corfurl_test:setup_del_all(length(Flus)). make_chains(ChainLen, FLUs) -> make_chains(ChainLen, FLUs, [], []). @@ -633,7 +633,8 @@ setup(NumChains, ChainLen, PageSize, SeqType) -> Chains = make_chains(ChainLen, FLUs), %% io:format(user, "Cs = ~p\n", [Chains]), Proj = corfurl:new_simple_projection(1, 1, ?MAX_PAGES, Chains), - Run = #run{seq=Seq, proj=Proj, flus=FLUs}, + Run = #run{proj=Proj#proj{seq={Seq, node(), 'corfurl pulse seq thingie'}}, + flus=FLUs}, ets:insert(?MY_TAB, {?MY_KEY, Run}), Run. @@ -688,7 +689,7 @@ log_make_result(Result) -> log_make_result(Pid, Result) -> {result, Pid, Result}. -pick_an_LPN(Seq, SeedInt) -> +pick_an_LPN(#proj{seq={Seq,_,_}}, SeedInt) -> Max = corfurl_sequencer:get(Seq, 0), %% The sequencer may be lying to us, shouganai. if SeedInt > Max -> (SeedInt rem Max) + 1; @@ -701,12 +702,12 @@ pick_an_LPN(Seq, SeedInt) -> event_logger:event(log_make_result(LOG__Result), lamport_clock:get()), LOG__Result). -append(#run{seq=Seq, proj=Proj}, Page) -> +append(#run{proj=Proj}, Page) -> lamport_clock:init(), lamport_clock:incr(), ?LOG({append, Page}, begin - Res = corfurl:append_page(Seq, Proj, Page), + Res = corfurl:append_page(Proj, Page), perhaps_trip_append_page(?TRIP_no_append_duplicates, Res, Page) end). @@ -715,21 +716,21 @@ read_result_mangle({ok, Page}) -> read_result_mangle(Else) -> Else. -read_approx(#run{seq=Seq, proj=Proj}, SeedInt) -> +read_approx(#run{proj=Proj}, SeedInt) -> lamport_clock:init(), lamport_clock:incr(), - LPN = pick_an_LPN(Seq, SeedInt), + LPN = pick_an_LPN(Proj, SeedInt), ?LOG({read, LPN}, begin Res = read_result_mangle(corfurl:read_page(Proj, LPN)), perhaps_trip_read_approx(?TRIP_bad_read, Res, LPN) end). -scan_forward(#run{seq=Seq, proj=Proj}, SeedInt, NumPages) -> +scan_forward(#run{proj=Proj}, SeedInt, NumPages) -> lamport_clock:init(), lamport_clock:incr(), StartLPN = if SeedInt == 1 -> 1; - true -> pick_an_LPN(Seq, SeedInt) + true -> pick_an_LPN(Proj, SeedInt) end, %% Our job is complicated by the ?LOG() macro, which isn't good enough %% for our purpose: we must lie about the starting timestamp, to make @@ -757,20 +758,20 @@ scan_forward(#run{seq=Seq, proj=Proj}, SeedInt, NumPages) -> end end). -fill(#run{seq=Seq, proj=Proj}, SeedInt) -> +fill(#run{proj=Proj}, SeedInt) -> lamport_clock:init(), lamport_clock:incr(), - LPN = pick_an_LPN(Seq, SeedInt), + LPN = pick_an_LPN(Proj, SeedInt), ?LOG({fill, LPN}, begin Res = corfurl:fill_page(Proj, LPN), perhaps_trip_fill_page(?TRIP_bad_fill, Res, LPN) end). -trim(#run{seq=Seq, proj=Proj}, SeedInt) -> +trim(#run{proj=Proj}, SeedInt) -> lamport_clock:init(), lamport_clock:incr(), - LPN = pick_an_LPN(Seq, SeedInt), + LPN = pick_an_LPN(Proj, SeedInt), ?LOG({trim, LPN}, begin Res = corfurl:trim_page(Proj, LPN), diff --git a/prototype/corfurl/test/corfurl_test.erl b/prototype/corfurl/test/corfurl_test.erl index c745538..1e221f3 100644 --- a/prototype/corfurl/test/corfurl_test.erl +++ b/prototype/corfurl/test/corfurl_test.erl @@ -83,8 +83,9 @@ smoke1_test() -> lists:flatten(io_lib:format("~8..0w", [X])))} || X <- lists:seq(1, 5)], try - P1 = ?M:new_simple_projection(1, 1, 1*100, [[F1, F2, F3], [F4, F5, F6]]), - [begin {ok, LPN} = ?M:append_page(Seq, P1, Pg) end || {LPN, Pg} <- LPN_Pgs], + P0 = ?M:new_simple_projection(1, 1, 1*100, [[F1, F2, F3], [F4, F5, F6]]), + P1 = P0#proj{seq={Seq, unused, unused}}, + [begin {ok, LPN} = ?M:append_page(P1, Pg) end || {LPN, Pg} <- LPN_Pgs], [begin {ok, Pg} = ?M:read_page(P1, LPN) end || {LPN, Pg} <- LPN_Pgs], @@ -153,11 +154,11 @@ forfun_test_() -> [forfun(Procs) || Procs <- [10,100,1000,5000]] end}. -forfun_append(0, _Seq, _P, _Page) -> +forfun_append(0, _P, _Page) -> ok; -forfun_append(N, Seq, P, Page) -> +forfun_append(N, #proj{seq={Seq, _, _}} = P, Page) -> {ok, _} = ?M:append_page(Seq, P, Page), - forfun_append(N - 1, Seq, P, Page). + forfun_append(N - 1, P, Page). %%% My MBP, SSD %%% The 1K and 5K procs shows full-mailbox-scan ickiness @@ -191,13 +192,14 @@ forfun(NumProcs) -> try Chains = [[F1, F2], [F3, F4]], %%Chains = [[F1], [F2], [F3], [F4]], - P = ?M:new_simple_projection(1, 1, NumPages*2, Chains), + P0 = ?M:new_simple_projection(1, 1, NumPages*2, Chains), + P = P0#proj{seq={Seq, unused, unused}}, Me = self(), Start = now(), Ws = [begin Page = <>, spawn_link(fun() -> - forfun_append(PagesPerProc, Seq, P, Page), + forfun_append(PagesPerProc, P, Page), Me ! {done, self()} end) end || X <- lists:seq(1, NumProcs)],