Add ?EVENT_LOG() to add extra trace info to corfurl and corfurl_flu

This commit is contained in:
Scott Lystig Fritchie 2014-02-24 12:03:48 +09:00
parent 479efce0b1
commit b7e3f91931
3 changed files with 47 additions and 11 deletions

View file

@ -36,6 +36,10 @@
-endif. -endif.
-endif. -endif.
%%% Debugging: for extra events in the PULSE event log, use the 2nd statement.
-define(EVENT_LOG(X), ok).
%%% -define(EVENT_LOG(X), event_logger:event(X)).
append_page(Sequencer, P, Page) -> append_page(Sequencer, P, Page) ->
append_page(Sequencer, P, Page, 1). append_page(Sequencer, P, Page, 1).
@ -77,7 +81,6 @@ write_single_page_to_chain([FLU|Rest], Epoch, LPN, Page, Nth) ->
%% Whoa, partner, you're movin' kinda fast for a trim. %% Whoa, partner, you're movin' kinda fast for a trim.
%% This might've been due to us being too slow and someone %% This might've been due to us being too slow and someone
%% else junked us. %% else junked us.
%% TODO We should go trim our previously successful writes?
error_trimmed; error_trimmed;
error_overwritten when Nth == 1 -> error_overwritten when Nth == 1 ->
%% The sequencer lied, or we didn't use the sequencer and %% The sequencer lied, or we didn't use the sequencer and
@ -131,49 +134,75 @@ ok_or_trim(Else) ->
Else. Else.
read_repair_chain(Epoch, LPN, [Head|Rest] = Chain) -> read_repair_chain(Epoch, LPN, [Head|Rest] = Chain) ->
?EVENT_LOG({read_repair, LPN, Chain, i_am, self()}),
case corfurl_flu:read(flu_pid(Head), Epoch, LPN) of case corfurl_flu:read(flu_pid(Head), Epoch, LPN) of
{ok, Page} -> {ok, Page} ->
?EVENT_LOG({read_repair, LPN, Head, ok}),
read_repair_chain2(Rest, Epoch, LPN, Page, Chain); read_repair_chain2(Rest, Epoch, LPN, Page, Chain);
error_badepoch -> error_badepoch ->
?EVENT_LOG({read_repair, LPN, Head, badepoch}),
error_badepoch; error_badepoch;
error_trimmed -> error_trimmed ->
?EVENT_LOG({read_repair, LPN, Head, trimmed}),
%% TODO: robustify %% TODO: robustify
[ok = case ok_or_trim(corfurl_flu:fill(flu_pid(X), Epoch, LPN)) of [begin
ok -> ok; ?EVENT_LOG({read_repair, LPN, fill, flu_pid(X)}),
error_overwritten -> ok_or_trim(corfurl_flu:trim( ok = case ok_or_trim(corfurl_flu:fill(flu_pid(X), Epoch,
flu_pid(X), Epoch, LPN)); LPN)) of
Else -> Else ok ->
end || X <- Rest], ?EVENT_LOG({read_repair, LPN, fill, flu_pid(X), ok}),
ok;
error_overwritten ->
?EVENT_LOG({read_repair, LPN, fill, flu_pid(X), overwritten, try_to_trim}),
Res2 = ok_or_trim(corfurl_flu:trim(
flu_pid(X), Epoch, LPN)),
?EVENT_LOG({read_repair, LPN, fill, flu_pid(X), trim, Res2}),
Res2;
Else ->
?EVENT_LOG({read_repair, LPN, fill, flu_pid(X), Else}),
Else
end
end || X <- Rest],
error_trimmed; error_trimmed;
error_unwritten -> error_unwritten ->
?EVENT_LOG({read_repair, LPN, read, Head, unwritten}),
error_unwritten error_unwritten
%% Let it crash: error_overwritten %% Let it crash: error_overwritten
end. end.
read_repair_chain2([] = _Repairees, _Epoch, _LPN, Page, _OriginalChain) -> read_repair_chain2([] = _Repairees, _Epoch, _LPN, Page, _OriginalChain) ->
?EVENT_LOG({read_repair2, _LPN, finished, {ok, Page}}),
{ok, Page}; {ok, Page};
read_repair_chain2([RepairFLU|Rest], Epoch, LPN, Page, OriginalChain) -> read_repair_chain2([RepairFLU|Rest], Epoch, LPN, Page, OriginalChain) ->
case corfurl_flu:write(flu_pid(RepairFLU), Epoch, LPN, Page) of case corfurl_flu:write(flu_pid(RepairFLU), Epoch, LPN, Page) of
ok -> ok ->
?EVENT_LOG({read_repair2, LPN, write, flu_pid(RepairFLU), ok}),
read_repair_chain2(Rest, Epoch, LPN, Page, OriginalChain); read_repair_chain2(Rest, Epoch, LPN, Page, OriginalChain);
error_badepoch -> error_badepoch ->
?EVENT_LOG({read_repair2, LPN, write, flu_pid(RepairFLU), badepoch}),
error_badepoch; error_badepoch;
error_trimmed -> error_trimmed ->
?EVENT_LOG({read_repair2, LPN, write, flu_pid(RepairFLU), trimmed}),
error_trimmed; error_trimmed;
error_overwritten -> error_overwritten ->
?EVENT_LOG({read_repair2, LPN, write, flu_pid(RepairFLU), overwritten}),
%% We're going to do an optional sanity check here. %% We're going to do an optional sanity check here.
%% TODO: make the sanity check configurable? %% TODO: make the sanity check configurable?
case corfurl_flu:read(flu_pid(RepairFLU), Epoch, LPN) of case corfurl_flu:read(flu_pid(RepairFLU), Epoch, LPN) of
{ok, Page2} when Page2 =:= Page -> {ok, Page2} when Page2 =:= Page ->
%% TODO: is there a need to continue working upstream ?EVENT_LOG({read_repair2, LPN, read, flu_pid(RepairFLU), exact_page}),
%% to fix problems? %% We're probably going to be racing against someone else
{ok, Page2}; %% that's also doing repair, but so be it.
read_repair_chain2(Rest, Epoch, LPN, Page, OriginalChain);
{ok, _Page2} -> {ok, _Page2} ->
?EVENT_LOG({read_repair2, LPN, read, flu_pid(RepairFLU), bad_page, _Page2}),
giant_error({bummerbummer, ?MODULE, ?LINE, giant_error({bummerbummer, ?MODULE, ?LINE,
sanity_check_failure, lpn, LPN, epoch, Epoch}); sanity_check_failure, lpn, LPN, epoch, Epoch});
error_badepoch -> error_badepoch ->
?EVENT_LOG({read_repair2, LPN, read, flu_pid(RepairFLU), badepoch}),
error_badepoch; error_badepoch;
error_trimmed -> error_trimmed ->
?EVENT_LOG({read_repair2, LPN, read, flu_pid(RepairFLU), trimmed}),
%% Start repair at the beginning to handle this case %% Start repair at the beginning to handle this case
read_repair_chain(Epoch, LPN, OriginalChain) read_repair_chain(Epoch, LPN, OriginalChain)
%% Let it crash: error_overwritten, error_unwritten %% Let it crash: error_overwritten, error_unwritten

View file

@ -45,6 +45,10 @@
-include_lib("kernel/include/file.hrl"). -include_lib("kernel/include/file.hrl").
%%% Debugging: for extra events in the PULSE event log, use the 2nd statement.
-define(EVENT_LOG(X), ok).
%%% -define(EVENT_LOG(X), event_logger(X)).
-record(state, { -record(state, {
dir :: string(), dir :: string(),
mem_fh :: term(), mem_fh :: term(),
@ -159,6 +163,7 @@ handle_call({{write, _ClientEpoch, LogicalPN, PageBin}, LC1}, _From,
{ok, Offset} -> {ok, Offset} ->
ok = write_page(Offset, LogicalPN, PageBin, State), ok = write_page(Offset, LogicalPN, PageBin, State),
NewMLPN = erlang:max(LogicalPN, MLPN), NewMLPN = erlang:max(LogicalPN, MLPN),
?EVENT_LOG({flu, write, self(), LogicalPN, ok}),
{reply, {ok, LC2}, State#state{max_logical_page=NewMLPN}}; {reply, {ok, LC2}, State#state{max_logical_page=NewMLPN}};
Else -> Else ->
{reply, {Else, LC2}, State} {reply, {Else, LC2}, State}
@ -191,6 +196,7 @@ handle_call({{trim, ClientEpoch, _LogicalPN}, LC1}, _From,
handle_call({{trim, _ClientEpoch, LogicalPN}, LC1}, _From, State) -> handle_call({{trim, _ClientEpoch, LogicalPN}, LC1}, _From, State) ->
LC2 = lamport_clock:update(LC1), LC2 = lamport_clock:update(LC1),
{Reply, NewState} = do_trim_or_fill(trim, LogicalPN, State), {Reply, NewState} = do_trim_or_fill(trim, LogicalPN, State),
?EVENT_LOG({flu, trim, self(), LogicalPN, Reply}),
{reply, {Reply, LC2}, NewState}; {reply, {Reply, LC2}, NewState};
handle_call({{fill, ClientEpoch, _LogicalPN}, LC1}, _From, handle_call({{fill, ClientEpoch, _LogicalPN}, LC1}, _From,
@ -201,6 +207,7 @@ handle_call({{fill, ClientEpoch, _LogicalPN}, LC1}, _From,
handle_call({{fill, _ClientEpoch, LogicalPN}, LC1}, _From, State) -> handle_call({{fill, _ClientEpoch, LogicalPN}, LC1}, _From, State) ->
LC2 = lamport_clock:update(LC1), LC2 = lamport_clock:update(LC1),
{Reply, NewState} = do_trim_or_fill(fill, LogicalPN, State), {Reply, NewState} = do_trim_or_fill(fill, LogicalPN, State),
?EVENT_LOG({flu, fill, self(), LogicalPN, Reply}),
{reply, {Reply, LC2}, NewState}; {reply, {Reply, LC2}, NewState};
handle_call(get__mlp, _From, State) -> handle_call(get__mlp, _From, State) ->

View file

@ -595,7 +595,7 @@ make_chains(_ChainLen, [], SmallAcc, BigAcc) ->
[lists:reverse(SmallAcc)|BigAcc]; [lists:reverse(SmallAcc)|BigAcc];
make_chains(ChainLen, [H|T], SmallAcc, BigAcc) -> make_chains(ChainLen, [H|T], SmallAcc, BigAcc) ->
if length(SmallAcc) == ChainLen -> if length(SmallAcc) == ChainLen ->
make_chains(ChainLen, T, [H], [SmallAcc|BigAcc]); make_chains(ChainLen, T, [H], [lists:reverse(SmallAcc)|BigAcc]);
true -> true ->
make_chains(ChainLen, T, [H|SmallAcc], BigAcc) make_chains(ChainLen, T, [H|SmallAcc], BigAcc)
end. end.