diff --git a/src/hanoi.erl b/src/hanoi.erl index b2a0b29..086deb8 100644 --- a/src/hanoi.erl +++ b/src/hanoi.erl @@ -116,7 +116,8 @@ receive_fold_range(MRef, PID,Fun,Acc0) -> {ok, Fun(K,V,Acc0)} catch Class:Exception -> - lager:warning("Exception in hanoi fold: ~p", [Exception]), + io:format(user, "Exception in hanoi fold: ~p ~p", [Exception, erlang:get_stacktrace()]), + %% lager:warning("Exception in hanoi fold: ~p", [Exception]), {'EXIT', Class, Exception, erlang:get_stacktrace()} end of diff --git a/src/hanoi_level.erl b/src/hanoi_level.erl index 13994d0..a9e5b40 100644 --- a/src/hanoi_level.erl +++ b/src/hanoi_level.erl @@ -67,7 +67,7 @@ debug_log(State,Fmt,Args) -> Files = [if State#state.c == undefined -> $ ; true -> $C end, if State#state.b == undefined -> $ ; true -> $B end, if State#state.a == undefined -> $ ; true -> $A end ], - io:format("~s~p[~s]: " ++ Fmt, [if State#state.level < 10 -> "0"; true -> "" end, + io:format(user,"~s~p[~s]: " ++ Fmt, [if State#state.level < 10 -> "0"; true -> "" end, State#state.level, Files] ++ Args), ok. @@ -131,10 +131,11 @@ code_change(_OldVsn, _State, _Extra) -> initialize(State) -> try - initialize2(State) + Result = initialize2(State), + ?log(" ** terminated ~p", [Result]) catch Class:Ex when not (Class == exit andalso Ex == normal) -> - ?log("crashing~n", []), + ?log("crashing ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()]), error_logger:error_msg("crash2: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()]) end. @@ -255,7 +256,7 @@ main_loop(State = #state{ next=Next }) -> main_loop(setelement(SetPos, State, BT)) end; E2 -> ?log("open failed ~p :: ~p~n", [ToFileName, E2]), - error(E2) + error(E2) end; @@ -285,6 +286,8 @@ main_loop(State = #state{ next=Next }) -> {MRef, step_done} when MRef == State#state.step_merge_ref -> demonitor(MRef, [flush]), + ?log("step_done", []), + State1 = State#state{ work_done = State#state.work_done + State#state.work_in_progress, work_in_progress = 0 }, @@ -297,8 +300,13 @@ main_loop(State = #state{ next=Next }) -> main_loop(State2#state{ step_merge_ref=undefined }); - {'DOWN', MRef, _, _, _} when MRef == State#state.step_merge_ref -> + {MRef, step_done}=Msg -> + ?log("unexpected step_done", []), + exit({bad_msg, Msg}); + {'DOWN', MRef, _, _, Reason} when MRef == State#state.step_merge_ref -> + + ?log("merge worker died ~p", [Reason]), %% current merge worker died (or just finished) case State#state.step_next_ref of @@ -311,10 +319,11 @@ main_loop(State = #state{ next=Next }) -> main_loop(State2#state{ step_merge_ref=undefined, work_in_progress=0 }); - ?REPLY(MRef, ok) + ?REPLY(MRef, step_ok) when MRef =:= State#state.step_next_ref, State#state.step_merge_ref =:= undefined -> + ?log("got step_ok", []), %% this applies when we receive an OK from the next level, %% and we have finished the incremental merge at this level @@ -335,10 +344,12 @@ main_loop(State = #state{ next=Next }) -> true -> hanoi_level:close(Next) end, - ok; + {ok, closing}; ?CALL(From, {init_snapshot_range_fold, WorkerPID, Range, List}) when State#state.folding == [] -> + ?log("init_range_fold ~p -> ~p", [Range, WorkerPID]), + case {State#state.a, State#state.b, State#state.c} of {undefined, undefined, undefined} -> FoldingPIDs = [], @@ -346,29 +357,29 @@ main_loop(State = #state{ next=Next }) -> {_, undefined, undefined} -> ok = file:make_link(filename("A", State), filename("AF", State)), - {ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, Range), + {ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, Range, State), NextList = [PID0|List], FoldingPIDs = [PID0]; {_, _, undefined} -> ok = file:make_link(filename("A", State), filename("AF", State)), - {ok, PIDA} = start_range_fold(filename("AF",State), WorkerPID, Range), + {ok, PIDA} = start_range_fold(filename("AF",State), WorkerPID, Range, State), ok = file:make_link(filename("B", State), filename("BF", State)), - {ok, PIDB} = start_range_fold(filename("BF",State), WorkerPID, Range), + {ok, PIDB} = start_range_fold(filename("BF",State), WorkerPID, Range, State), NextList = [PIDA,PIDB|List], FoldingPIDs = [PIDB,PIDA]; {_, _, _} -> ok = file:make_link(filename("A", State), filename("AF", State)), - {ok, PIDA} = start_range_fold(filename("AF",State), WorkerPID, Range), + {ok, PIDA} = start_range_fold(filename("AF",State), WorkerPID, Range, State), ok = file:make_link(filename("B", State), filename("BF", State)), - {ok, PIDB} = start_range_fold(filename("BF",State), WorkerPID, Range), + {ok, PIDB} = start_range_fold(filename("BF",State), WorkerPID, Range, State), ok = file:make_link(filename("C", State), filename("CF", State)), - {ok, PIDC} = start_range_fold(filename("CF",State), WorkerPID, Range), + {ok, PIDC} = start_range_fold(filename("CF",State), WorkerPID, Range, State), NextList = [PIDA,PIDB,PIDC|List], FoldingPIDs = [PIDC,PIDB,PIDA] @@ -460,7 +471,6 @@ main_loop(State = #state{ next=Next }) -> file:rename(filename("C",State2), filename("B", State2)), check_begin_merge_then_loop(State2#state{ a=BT, b=TreeFile, c=undefined, merge_pid=undefined }) - end; %% @@ -518,11 +528,11 @@ main_loop(State = #state{ next=Next }) -> {'EXIT', Parent, Reason} -> plain_fsm:parent_EXIT(Reason, State); + {'EXIT', _, normal} -> %% Probably from a merger_pid - which we may have forgotten in the meantime. main_loop(State); {'EXIT', Pid, Reason} when Pid == State#state.merge_pid -> - ?log("*** merge_died: ~p~n", [Reason]), restart_merge_then_loop(State#state{merge_pid=undefined}, Reason); {'EXIT', PID, _} when PID == hd(State#state.folding); @@ -531,6 +541,7 @@ main_loop(State = #state{ next=Next }) -> -> main_loop(State#state{ folding = lists:delete(PID,State#state.folding) }); {'EXIT', PID, Reason} -> + ?log("got unexpected exit ~p from ~p~n", [Reason, PID]), error_logger:info_msg("got unexpected exit ~p from ~p~n", [Reason, PID]) @@ -579,8 +590,11 @@ do_step(StepFrom, HowMuch, State) -> reply_step_ok(State) -> case State#state.step_caller of - undefined -> ok; - _ -> plain_rpc:send_reply(State#state.step_caller, ok) + undefined -> + ok; + _ -> + ?log("step_ok -> ~p", [State#state.step_caller]), + plain_rpc:send_reply(State#state.step_caller, step_ok) end, State#state{ step_caller=undefined }. @@ -636,7 +650,7 @@ begin_merge(State) -> MergePID = proc_lib:spawn_link(fun() -> try - ?log("merge begun", []), + ?log("merge begun~n", []), {ok, OutCount} = hanoi_merger:merge(AFileName, BFileName, XFileName, ?BTREE_SIZE(State#state.level + 1), @@ -672,10 +686,11 @@ filename(PFX, State) -> filename:join(State#state.dir, PFX ++ "-" ++ integer_to_list(State#state.level) ++ ".data"). -start_range_fold(FileName, WorkerPID, Range) -> +start_range_fold(FileName, WorkerPID, Range, State) -> Owner = self(), PID = proc_lib:spawn( fun() -> + ?log("start_range_fold ~p on ~p -> ~p", [self, FileName, WorkerPID]), erlang:link(WorkerPID), {ok, File} = hanoi_reader:open(FileName, sequential), do_range_fold(File, WorkerPID, self(), Range), @@ -683,7 +698,8 @@ start_range_fold(FileName, WorkerPID, Range) -> hanoi_reader:close(File), %% this will release the pinning of the fold file - Owner ! {range_fold_done, self(), FileName} + Owner ! {range_fold_done, self(), FileName}, + ok end ), {ok, PID}. diff --git a/src/hanoi_nursery.erl b/src/hanoi_nursery.erl index 36f89a5..60edf7c 100644 --- a/src/hanoi_nursery.erl +++ b/src/hanoi_nursery.erl @@ -187,7 +187,7 @@ add_maybe_flush(Key, Value, Nursery, Top) -> end. flush(Nursery=#nursery{ dir=Dir, max_level=MaxLevel }, Top) -> - ok = hanoi_nursery:finish(Nursery, Top), + ok = finish(Nursery, Top), {error, enoent} = file:read_file_info( filename:join(Dir, "nursery.log")), hanoi_nursery:new(Dir, MaxLevel). diff --git a/src/plain_rpc.erl b/src/plain_rpc.erl index cc7e467..e50d7a5 100644 --- a/src/plain_rpc.erl +++ b/src/plain_rpc.erl @@ -57,6 +57,9 @@ call(PID,Request) -> Reply; {'DOWN', MRef, _, _, Reason} -> exit(Reason) +% after 3000 -> +% erlang:demonitor(MRef, [flush]), +% exit({rpc_timeout, Request}) end.