diff --git a/prototype/corfurl/src/corfurl.erl b/prototype/corfurl/src/corfurl.erl index ab0b127..7a1dd0d 100644 --- a/prototype/corfurl/src/corfurl.erl +++ b/prototype/corfurl/src/corfurl.erl @@ -36,6 +36,10 @@ -endif. -endif. +%%% Debugging: for extra events in the PULSE event log, use the 2nd statement. +-define(EVENT_LOG(X), ok). +%%% -define(EVENT_LOG(X), event_logger:event(X)). + append_page(Sequencer, P, Page) -> append_page(Sequencer, P, Page, 1). @@ -77,7 +81,6 @@ write_single_page_to_chain([FLU|Rest], Epoch, LPN, Page, Nth) -> %% Whoa, partner, you're movin' kinda fast for a trim. %% This might've been due to us being too slow and someone %% else junked us. - %% TODO We should go trim our previously successful writes? error_trimmed; error_overwritten when Nth == 1 -> %% The sequencer lied, or we didn't use the sequencer and @@ -131,49 +134,75 @@ ok_or_trim(Else) -> Else. read_repair_chain(Epoch, LPN, [Head|Rest] = Chain) -> + ?EVENT_LOG({read_repair, LPN, Chain, i_am, self()}), case corfurl_flu:read(flu_pid(Head), Epoch, LPN) of {ok, Page} -> + ?EVENT_LOG({read_repair, LPN, Head, ok}), read_repair_chain2(Rest, Epoch, LPN, Page, Chain); error_badepoch -> + ?EVENT_LOG({read_repair, LPN, Head, badepoch}), error_badepoch; error_trimmed -> + ?EVENT_LOG({read_repair, LPN, Head, trimmed}), %% TODO: robustify - [ok = case ok_or_trim(corfurl_flu:fill(flu_pid(X), Epoch, LPN)) of - ok -> ok; - error_overwritten -> ok_or_trim(corfurl_flu:trim( - flu_pid(X), Epoch, LPN)); - Else -> Else - end || X <- Rest], + [begin + ?EVENT_LOG({read_repair, LPN, fill, flu_pid(X)}), + ok = case ok_or_trim(corfurl_flu:fill(flu_pid(X), Epoch, + LPN)) of + ok -> + ?EVENT_LOG({read_repair, LPN, fill, flu_pid(X), ok}), + ok; + error_overwritten -> + ?EVENT_LOG({read_repair, LPN, fill, flu_pid(X), overwritten, try_to_trim}), + Res2 = ok_or_trim(corfurl_flu:trim( + flu_pid(X), Epoch, LPN)), + ?EVENT_LOG({read_repair, LPN, fill, flu_pid(X), trim, Res2}), + Res2; + Else -> + ?EVENT_LOG({read_repair, LPN, fill, flu_pid(X), Else}), + Else + end + end || X <- Rest], error_trimmed; error_unwritten -> + ?EVENT_LOG({read_repair, LPN, read, Head, unwritten}), error_unwritten %% Let it crash: error_overwritten end. read_repair_chain2([] = _Repairees, _Epoch, _LPN, Page, _OriginalChain) -> + ?EVENT_LOG({read_repair2, _LPN, finished, {ok, Page}}), {ok, Page}; read_repair_chain2([RepairFLU|Rest], Epoch, LPN, Page, OriginalChain) -> case corfurl_flu:write(flu_pid(RepairFLU), Epoch, LPN, Page) of ok -> + ?EVENT_LOG({read_repair2, LPN, write, flu_pid(RepairFLU), ok}), read_repair_chain2(Rest, Epoch, LPN, Page, OriginalChain); error_badepoch -> + ?EVENT_LOG({read_repair2, LPN, write, flu_pid(RepairFLU), badepoch}), error_badepoch; error_trimmed -> + ?EVENT_LOG({read_repair2, LPN, write, flu_pid(RepairFLU), trimmed}), error_trimmed; error_overwritten -> + ?EVENT_LOG({read_repair2, LPN, write, flu_pid(RepairFLU), overwritten}), %% We're going to do an optional sanity check here. %% TODO: make the sanity check configurable? case corfurl_flu:read(flu_pid(RepairFLU), Epoch, LPN) of {ok, Page2} when Page2 =:= Page -> - %% TODO: is there a need to continue working upstream - %% to fix problems? - {ok, Page2}; + ?EVENT_LOG({read_repair2, LPN, read, flu_pid(RepairFLU), exact_page}), + %% We're probably going to be racing against someone else + %% that's also doing repair, but so be it. + read_repair_chain2(Rest, Epoch, LPN, Page, OriginalChain); {ok, _Page2} -> + ?EVENT_LOG({read_repair2, LPN, read, flu_pid(RepairFLU), bad_page, _Page2}), giant_error({bummerbummer, ?MODULE, ?LINE, sanity_check_failure, lpn, LPN, epoch, Epoch}); error_badepoch -> + ?EVENT_LOG({read_repair2, LPN, read, flu_pid(RepairFLU), badepoch}), error_badepoch; error_trimmed -> + ?EVENT_LOG({read_repair2, LPN, read, flu_pid(RepairFLU), trimmed}), %% Start repair at the beginning to handle this case read_repair_chain(Epoch, LPN, OriginalChain) %% Let it crash: error_overwritten, error_unwritten diff --git a/prototype/corfurl/src/corfurl_flu.erl b/prototype/corfurl/src/corfurl_flu.erl index ba370da..d847d5d 100644 --- a/prototype/corfurl/src/corfurl_flu.erl +++ b/prototype/corfurl/src/corfurl_flu.erl @@ -45,6 +45,10 @@ -include_lib("kernel/include/file.hrl"). +%%% Debugging: for extra events in the PULSE event log, use the 2nd statement. +-define(EVENT_LOG(X), ok). +%%% -define(EVENT_LOG(X), event_logger(X)). + -record(state, { dir :: string(), mem_fh :: term(), @@ -159,6 +163,7 @@ handle_call({{write, _ClientEpoch, LogicalPN, PageBin}, LC1}, _From, {ok, Offset} -> ok = write_page(Offset, LogicalPN, PageBin, State), NewMLPN = erlang:max(LogicalPN, MLPN), + ?EVENT_LOG({flu, write, self(), LogicalPN, ok}), {reply, {ok, LC2}, State#state{max_logical_page=NewMLPN}}; Else -> {reply, {Else, LC2}, State} @@ -191,6 +196,7 @@ handle_call({{trim, ClientEpoch, _LogicalPN}, LC1}, _From, handle_call({{trim, _ClientEpoch, LogicalPN}, LC1}, _From, State) -> LC2 = lamport_clock:update(LC1), {Reply, NewState} = do_trim_or_fill(trim, LogicalPN, State), + ?EVENT_LOG({flu, trim, self(), LogicalPN, Reply}), {reply, {Reply, LC2}, NewState}; handle_call({{fill, ClientEpoch, _LogicalPN}, LC1}, _From, @@ -201,6 +207,7 @@ handle_call({{fill, ClientEpoch, _LogicalPN}, LC1}, _From, handle_call({{fill, _ClientEpoch, LogicalPN}, LC1}, _From, State) -> LC2 = lamport_clock:update(LC1), {Reply, NewState} = do_trim_or_fill(fill, LogicalPN, State), + ?EVENT_LOG({flu, fill, self(), LogicalPN, Reply}), {reply, {Reply, LC2}, NewState}; handle_call(get__mlp, _From, State) -> diff --git a/prototype/corfurl/test/corfurl_pulse.erl b/prototype/corfurl/test/corfurl_pulse.erl index e687dbe..7a3c645 100644 --- a/prototype/corfurl/test/corfurl_pulse.erl +++ b/prototype/corfurl/test/corfurl_pulse.erl @@ -595,7 +595,7 @@ make_chains(_ChainLen, [], SmallAcc, BigAcc) -> [lists:reverse(SmallAcc)|BigAcc]; make_chains(ChainLen, [H|T], SmallAcc, BigAcc) -> if length(SmallAcc) == ChainLen -> - make_chains(ChainLen, T, [H], [SmallAcc|BigAcc]); + make_chains(ChainLen, T, [H], [lists:reverse(SmallAcc)|BigAcc]); true -> make_chains(ChainLen, T, [H|SmallAcc], BigAcc) end.