diff --git a/src/hanoi_level.erl b/src/hanoi_level.erl index 4cde09d..180b057 100644 --- a/src/hanoi_level.erl +++ b/src/hanoi_level.erl @@ -55,6 +55,21 @@ opts = [], owner, work_done=0 }). + +-ifdef(DEBUG). +-define(log(Fmt,Args), debug_log(State,Fmt,Args)). +debug_log(State,Fmt,Args) -> + Files = [if State#state.c == undefined -> $ ; true -> $C end, + if State#state.b == undefined -> $ ; true -> $B end, + if State#state.a == undefined -> $ ; true -> $A end ], + io:format("~s~p[~s]: " ++ Fmt, [if State#state.level < 10 -> "0"; true -> "" end, + State#state.level, + Files] ++ Args), + ok. +-else. +-define(log(Fmt,Args),ok). +-endif. + %%%%% PUBLIC OPERATIONS open(Dir,Level,Next,Opts,Owner) when Level>0 -> @@ -218,11 +233,27 @@ main_loop(State = #state{ next=Next }) -> ToFileName = filename("C",State), SetPos = #state.c end, - ok = file:rename(FileName, ToFileName), - {ok, BT} = hanoi_reader:open(ToFileName, random), - reply(From, ok), - check_begin_merge_then_loop(setelement(SetPos, State, BT)); + ?log("inject ~s~n", [ToFileName]), + + case file:rename(FileName, ToFileName) of + ok -> ok; + E -> ?log("rename failed ~p -> ~p :: ~p~n", [FileName, ToFileName, E]), + error(E) + end, + + plain_rpc:send_reply(From, ok), + + case hanoi_reader:open(ToFileName, random) of + {ok, BT} -> + if SetPos == #state.b -> + check_begin_merge_then_loop(setelement(SetPos, State, BT)); + true -> + main_loop(setelement(SetPos, State, BT)) + end; + E2 -> ?log("open failed ~p :: ~p~n", [ToFileName, E2]), + error(E2) + end; ?CALL(From, unmerged_count) -> @@ -252,6 +283,8 @@ main_loop(State = #state{ next=Next }) -> WorkToDoHere = min(WorkLeftHere, HowMuch), DelegateWork = max(0,HowMuch - WorkToDoHere), + ?log("step:~p, do:~p, left:~p ~n", [HowMuch, WorkToDoHere, WorkLeftHere]), + %% delegate the step request to the next level if Next =:= undefined; DelegateWork == 0 -> DelegateRef = undefined; @@ -464,6 +497,9 @@ main_loop(State = #state{ next=Next }) -> %% We need to push the output of merging to the next level %% ?CAST(_,{merge_done, _Count, OutFileName}) -> + + ?log("merge_done, out:~w, next:~p~n", [_Count,Next]), + State1 = if Next =:= undefined -> {ok, PID} = ?MODULE:open(State#state.dir, State#state.level + 1, undefined, @@ -516,6 +552,8 @@ main_loop(State = #state{ next=Next }) -> %% Probably from a merger_pid - which we may have forgotten in the meantime. main_loop(State); {'EXIT', Pid, Reason} when Pid == State#state.merge_pid -> + + ?log("*** merge_died: ~p~n", [Reason]), restart_merge_then_loop(State#state{merge_pid=undefined}, Reason); {'EXIT', PID, _} when [PID] == tl(State#state.folding); hd(State#state.folding) == PID -> @@ -523,6 +561,7 @@ main_loop(State = #state{ next=Next }) -> {'EXIT', PID, Reason} -> error_logger:info_msg("got unexpected exit ~p from ~p~n", [Reason, PID]) + end. reply_step_ok(State) -> @@ -577,15 +616,18 @@ begin_merge(State) -> XFileName = filename("X",State), Owner = self(), + ?log("starting merge~n", []), + file:delete(XFileName), MergePID = proc_lib:spawn_link(fun() -> try + ?log("merge begun", []), + {ok, OutCount} = hanoi_merger:merge(AFileName, BFileName, XFileName, ?BTREE_SIZE(State#state.level + 1), State#state.next =:= undefined, State#state.opts ), -% error_logger:info_msg("merge done ~p,~p -> ~p~n", [AFileName, BFileName, XFileName]), Owner ! ?CAST(self(),{merge_done, OutCount, XFileName}) catch