Add debug logging
This commit is contained in:
parent
f41aaa265e
commit
89b04fe4fb
4 changed files with 42 additions and 22 deletions
|
@ -116,7 +116,8 @@ receive_fold_range(MRef, PID,Fun,Acc0) ->
|
||||||
{ok, Fun(K,V,Acc0)}
|
{ok, Fun(K,V,Acc0)}
|
||||||
catch
|
catch
|
||||||
Class:Exception ->
|
Class:Exception ->
|
||||||
lager:warning("Exception in hanoi fold: ~p", [Exception]),
|
io:format(user, "Exception in hanoi fold: ~p ~p", [Exception, erlang:get_stacktrace()]),
|
||||||
|
%% lager:warning("Exception in hanoi fold: ~p", [Exception]),
|
||||||
{'EXIT', Class, Exception, erlang:get_stacktrace()}
|
{'EXIT', Class, Exception, erlang:get_stacktrace()}
|
||||||
end
|
end
|
||||||
of
|
of
|
||||||
|
|
|
@ -67,7 +67,7 @@ debug_log(State,Fmt,Args) ->
|
||||||
Files = [if State#state.c == undefined -> $ ; true -> $C end,
|
Files = [if State#state.c == undefined -> $ ; true -> $C end,
|
||||||
if State#state.b == undefined -> $ ; true -> $B end,
|
if State#state.b == undefined -> $ ; true -> $B end,
|
||||||
if State#state.a == undefined -> $ ; true -> $A end ],
|
if State#state.a == undefined -> $ ; true -> $A end ],
|
||||||
io:format("~s~p[~s]: " ++ Fmt, [if State#state.level < 10 -> "0"; true -> "" end,
|
io:format(user,"~s~p[~s]: " ++ Fmt, [if State#state.level < 10 -> "0"; true -> "" end,
|
||||||
State#state.level,
|
State#state.level,
|
||||||
Files] ++ Args),
|
Files] ++ Args),
|
||||||
ok.
|
ok.
|
||||||
|
@ -131,10 +131,11 @@ code_change(_OldVsn, _State, _Extra) ->
|
||||||
initialize(State) ->
|
initialize(State) ->
|
||||||
|
|
||||||
try
|
try
|
||||||
initialize2(State)
|
Result = initialize2(State),
|
||||||
|
?log(" ** terminated ~p", [Result])
|
||||||
catch
|
catch
|
||||||
Class:Ex when not (Class == exit andalso Ex == normal) ->
|
Class:Ex when not (Class == exit andalso Ex == normal) ->
|
||||||
?log("crashing~n", []),
|
?log("crashing ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()]),
|
||||||
error_logger:error_msg("crash2: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()])
|
error_logger:error_msg("crash2: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -255,7 +256,7 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
main_loop(setelement(SetPos, State, BT))
|
main_loop(setelement(SetPos, State, BT))
|
||||||
end;
|
end;
|
||||||
E2 -> ?log("open failed ~p :: ~p~n", [ToFileName, E2]),
|
E2 -> ?log("open failed ~p :: ~p~n", [ToFileName, E2]),
|
||||||
error(E2)
|
error(E2)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
|
||||||
|
@ -285,6 +286,8 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
{MRef, step_done} when MRef == State#state.step_merge_ref ->
|
{MRef, step_done} when MRef == State#state.step_merge_ref ->
|
||||||
demonitor(MRef, [flush]),
|
demonitor(MRef, [flush]),
|
||||||
|
|
||||||
|
?log("step_done", []),
|
||||||
|
|
||||||
State1 = State#state{ work_done = State#state.work_done + State#state.work_in_progress,
|
State1 = State#state{ work_done = State#state.work_done + State#state.work_in_progress,
|
||||||
work_in_progress = 0 },
|
work_in_progress = 0 },
|
||||||
|
|
||||||
|
@ -297,8 +300,13 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
|
|
||||||
main_loop(State2#state{ step_merge_ref=undefined });
|
main_loop(State2#state{ step_merge_ref=undefined });
|
||||||
|
|
||||||
{'DOWN', MRef, _, _, _} when MRef == State#state.step_merge_ref ->
|
{MRef, step_done}=Msg ->
|
||||||
|
?log("unexpected step_done", []),
|
||||||
|
exit({bad_msg, Msg});
|
||||||
|
|
||||||
|
{'DOWN', MRef, _, _, Reason} when MRef == State#state.step_merge_ref ->
|
||||||
|
|
||||||
|
?log("merge worker died ~p", [Reason]),
|
||||||
%% current merge worker died (or just finished)
|
%% current merge worker died (or just finished)
|
||||||
|
|
||||||
case State#state.step_next_ref of
|
case State#state.step_next_ref of
|
||||||
|
@ -311,10 +319,11 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
main_loop(State2#state{ step_merge_ref=undefined, work_in_progress=0 });
|
main_loop(State2#state{ step_merge_ref=undefined, work_in_progress=0 });
|
||||||
|
|
||||||
|
|
||||||
?REPLY(MRef, ok)
|
?REPLY(MRef, step_ok)
|
||||||
when MRef =:= State#state.step_next_ref,
|
when MRef =:= State#state.step_next_ref,
|
||||||
State#state.step_merge_ref =:= undefined ->
|
State#state.step_merge_ref =:= undefined ->
|
||||||
|
|
||||||
|
?log("got step_ok", []),
|
||||||
%% this applies when we receive an OK from the next level,
|
%% this applies when we receive an OK from the next level,
|
||||||
%% and we have finished the incremental merge at this level
|
%% and we have finished the incremental merge at this level
|
||||||
|
|
||||||
|
@ -335,10 +344,12 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
true ->
|
true ->
|
||||||
hanoi_level:close(Next)
|
hanoi_level:close(Next)
|
||||||
end,
|
end,
|
||||||
ok;
|
{ok, closing};
|
||||||
|
|
||||||
?CALL(From, {init_snapshot_range_fold, WorkerPID, Range, List}) when State#state.folding == [] ->
|
?CALL(From, {init_snapshot_range_fold, WorkerPID, Range, List}) when State#state.folding == [] ->
|
||||||
|
|
||||||
|
?log("init_range_fold ~p -> ~p", [Range, WorkerPID]),
|
||||||
|
|
||||||
case {State#state.a, State#state.b, State#state.c} of
|
case {State#state.a, State#state.b, State#state.c} of
|
||||||
{undefined, undefined, undefined} ->
|
{undefined, undefined, undefined} ->
|
||||||
FoldingPIDs = [],
|
FoldingPIDs = [],
|
||||||
|
@ -346,29 +357,29 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
|
|
||||||
{_, undefined, undefined} ->
|
{_, undefined, undefined} ->
|
||||||
ok = file:make_link(filename("A", State), filename("AF", State)),
|
ok = file:make_link(filename("A", State), filename("AF", State)),
|
||||||
{ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, Range),
|
{ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, Range, State),
|
||||||
NextList = [PID0|List],
|
NextList = [PID0|List],
|
||||||
FoldingPIDs = [PID0];
|
FoldingPIDs = [PID0];
|
||||||
|
|
||||||
{_, _, undefined} ->
|
{_, _, undefined} ->
|
||||||
ok = file:make_link(filename("A", State), filename("AF", State)),
|
ok = file:make_link(filename("A", State), filename("AF", State)),
|
||||||
{ok, PIDA} = start_range_fold(filename("AF",State), WorkerPID, Range),
|
{ok, PIDA} = start_range_fold(filename("AF",State), WorkerPID, Range, State),
|
||||||
|
|
||||||
ok = file:make_link(filename("B", State), filename("BF", State)),
|
ok = file:make_link(filename("B", State), filename("BF", State)),
|
||||||
{ok, PIDB} = start_range_fold(filename("BF",State), WorkerPID, Range),
|
{ok, PIDB} = start_range_fold(filename("BF",State), WorkerPID, Range, State),
|
||||||
|
|
||||||
NextList = [PIDA,PIDB|List],
|
NextList = [PIDA,PIDB|List],
|
||||||
FoldingPIDs = [PIDB,PIDA];
|
FoldingPIDs = [PIDB,PIDA];
|
||||||
|
|
||||||
{_, _, _} ->
|
{_, _, _} ->
|
||||||
ok = file:make_link(filename("A", State), filename("AF", State)),
|
ok = file:make_link(filename("A", State), filename("AF", State)),
|
||||||
{ok, PIDA} = start_range_fold(filename("AF",State), WorkerPID, Range),
|
{ok, PIDA} = start_range_fold(filename("AF",State), WorkerPID, Range, State),
|
||||||
|
|
||||||
ok = file:make_link(filename("B", State), filename("BF", State)),
|
ok = file:make_link(filename("B", State), filename("BF", State)),
|
||||||
{ok, PIDB} = start_range_fold(filename("BF",State), WorkerPID, Range),
|
{ok, PIDB} = start_range_fold(filename("BF",State), WorkerPID, Range, State),
|
||||||
|
|
||||||
ok = file:make_link(filename("C", State), filename("CF", State)),
|
ok = file:make_link(filename("C", State), filename("CF", State)),
|
||||||
{ok, PIDC} = start_range_fold(filename("CF",State), WorkerPID, Range),
|
{ok, PIDC} = start_range_fold(filename("CF",State), WorkerPID, Range, State),
|
||||||
|
|
||||||
NextList = [PIDA,PIDB,PIDC|List],
|
NextList = [PIDA,PIDB,PIDC|List],
|
||||||
FoldingPIDs = [PIDC,PIDB,PIDA]
|
FoldingPIDs = [PIDC,PIDB,PIDA]
|
||||||
|
@ -460,7 +471,6 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
file:rename(filename("C",State2), filename("B", State2)),
|
file:rename(filename("C",State2), filename("B", State2)),
|
||||||
check_begin_merge_then_loop(State2#state{ a=BT, b=TreeFile, c=undefined,
|
check_begin_merge_then_loop(State2#state{ a=BT, b=TreeFile, c=undefined,
|
||||||
merge_pid=undefined })
|
merge_pid=undefined })
|
||||||
|
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
@ -518,11 +528,11 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
|
|
||||||
{'EXIT', Parent, Reason} ->
|
{'EXIT', Parent, Reason} ->
|
||||||
plain_fsm:parent_EXIT(Reason, State);
|
plain_fsm:parent_EXIT(Reason, State);
|
||||||
|
|
||||||
{'EXIT', _, normal} ->
|
{'EXIT', _, normal} ->
|
||||||
%% Probably from a merger_pid - which we may have forgotten in the meantime.
|
%% Probably from a merger_pid - which we may have forgotten in the meantime.
|
||||||
main_loop(State);
|
main_loop(State);
|
||||||
{'EXIT', Pid, Reason} when Pid == State#state.merge_pid ->
|
{'EXIT', Pid, Reason} when Pid == State#state.merge_pid ->
|
||||||
|
|
||||||
?log("*** merge_died: ~p~n", [Reason]),
|
?log("*** merge_died: ~p~n", [Reason]),
|
||||||
restart_merge_then_loop(State#state{merge_pid=undefined}, Reason);
|
restart_merge_then_loop(State#state{merge_pid=undefined}, Reason);
|
||||||
{'EXIT', PID, _} when PID == hd(State#state.folding);
|
{'EXIT', PID, _} when PID == hd(State#state.folding);
|
||||||
|
@ -531,6 +541,7 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
->
|
->
|
||||||
main_loop(State#state{ folding = lists:delete(PID,State#state.folding) });
|
main_loop(State#state{ folding = lists:delete(PID,State#state.folding) });
|
||||||
{'EXIT', PID, Reason} ->
|
{'EXIT', PID, Reason} ->
|
||||||
|
?log("got unexpected exit ~p from ~p~n", [Reason, PID]),
|
||||||
error_logger:info_msg("got unexpected exit ~p from ~p~n", [Reason, PID])
|
error_logger:info_msg("got unexpected exit ~p from ~p~n", [Reason, PID])
|
||||||
|
|
||||||
|
|
||||||
|
@ -579,8 +590,11 @@ do_step(StepFrom, HowMuch, State) ->
|
||||||
|
|
||||||
reply_step_ok(State) ->
|
reply_step_ok(State) ->
|
||||||
case State#state.step_caller of
|
case State#state.step_caller of
|
||||||
undefined -> ok;
|
undefined ->
|
||||||
_ -> plain_rpc:send_reply(State#state.step_caller, ok)
|
ok;
|
||||||
|
_ ->
|
||||||
|
?log("step_ok -> ~p", [State#state.step_caller]),
|
||||||
|
plain_rpc:send_reply(State#state.step_caller, step_ok)
|
||||||
end,
|
end,
|
||||||
State#state{ step_caller=undefined }.
|
State#state{ step_caller=undefined }.
|
||||||
|
|
||||||
|
@ -636,7 +650,7 @@ begin_merge(State) ->
|
||||||
|
|
||||||
MergePID = proc_lib:spawn_link(fun() ->
|
MergePID = proc_lib:spawn_link(fun() ->
|
||||||
try
|
try
|
||||||
?log("merge begun", []),
|
?log("merge begun~n", []),
|
||||||
|
|
||||||
{ok, OutCount} = hanoi_merger:merge(AFileName, BFileName, XFileName,
|
{ok, OutCount} = hanoi_merger:merge(AFileName, BFileName, XFileName,
|
||||||
?BTREE_SIZE(State#state.level + 1),
|
?BTREE_SIZE(State#state.level + 1),
|
||||||
|
@ -672,10 +686,11 @@ filename(PFX, State) ->
|
||||||
filename:join(State#state.dir, PFX ++ "-" ++ integer_to_list(State#state.level) ++ ".data").
|
filename:join(State#state.dir, PFX ++ "-" ++ integer_to_list(State#state.level) ++ ".data").
|
||||||
|
|
||||||
|
|
||||||
start_range_fold(FileName, WorkerPID, Range) ->
|
start_range_fold(FileName, WorkerPID, Range, State) ->
|
||||||
Owner = self(),
|
Owner = self(),
|
||||||
PID =
|
PID =
|
||||||
proc_lib:spawn( fun() ->
|
proc_lib:spawn( fun() ->
|
||||||
|
?log("start_range_fold ~p on ~p -> ~p", [self, FileName, WorkerPID]),
|
||||||
erlang:link(WorkerPID),
|
erlang:link(WorkerPID),
|
||||||
{ok, File} = hanoi_reader:open(FileName, sequential),
|
{ok, File} = hanoi_reader:open(FileName, sequential),
|
||||||
do_range_fold(File, WorkerPID, self(), Range),
|
do_range_fold(File, WorkerPID, self(), Range),
|
||||||
|
@ -683,7 +698,8 @@ start_range_fold(FileName, WorkerPID, Range) ->
|
||||||
hanoi_reader:close(File),
|
hanoi_reader:close(File),
|
||||||
|
|
||||||
%% this will release the pinning of the fold file
|
%% this will release the pinning of the fold file
|
||||||
Owner ! {range_fold_done, self(), FileName}
|
Owner ! {range_fold_done, self(), FileName},
|
||||||
|
ok
|
||||||
end ),
|
end ),
|
||||||
{ok, PID}.
|
{ok, PID}.
|
||||||
|
|
||||||
|
|
|
@ -187,7 +187,7 @@ add_maybe_flush(Key, Value, Nursery, Top) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
flush(Nursery=#nursery{ dir=Dir, max_level=MaxLevel }, Top) ->
|
flush(Nursery=#nursery{ dir=Dir, max_level=MaxLevel }, Top) ->
|
||||||
ok = hanoi_nursery:finish(Nursery, Top),
|
ok = finish(Nursery, Top),
|
||||||
{error, enoent} = file:read_file_info( filename:join(Dir, "nursery.log")),
|
{error, enoent} = file:read_file_info( filename:join(Dir, "nursery.log")),
|
||||||
hanoi_nursery:new(Dir, MaxLevel).
|
hanoi_nursery:new(Dir, MaxLevel).
|
||||||
|
|
||||||
|
|
|
@ -57,6 +57,9 @@ call(PID,Request) ->
|
||||||
Reply;
|
Reply;
|
||||||
{'DOWN', MRef, _, _, Reason} ->
|
{'DOWN', MRef, _, _, Reason} ->
|
||||||
exit(Reason)
|
exit(Reason)
|
||||||
|
% after 3000 ->
|
||||||
|
% erlang:demonitor(MRef, [flush]),
|
||||||
|
% exit({rpc_timeout, Request})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue