Consoladate two 99.9% identical functions into one.

This commit is contained in:
Gregory Burd 2012-07-24 10:41:37 -04:00
parent 38307c657e
commit 67f8f2afbc

View file

@ -151,16 +151,16 @@ fold_range(Ref,Fun,Acc0,#key_range{limit=Limit}=Range) ->
true -> snapshot_range true -> snapshot_range
end, end,
{ok, FoldWorkerPID} = hanoidb_fold_worker:start(self()), {ok, FoldWorkerPID} = hanoidb_fold_worker:start(self()),
?log("fold_range begin: self=~p, worker=~p~n", [self(), FoldWorkerPID]),
ok = gen_server:call(Ref, {RangeType, FoldWorkerPID, Range}, infinity),
MRef = erlang:monitor(process, FoldWorkerPID), MRef = erlang:monitor(process, FoldWorkerPID),
?log("fold_range begin: self=~p, worker=~p monitor=~p~n", [self(), FoldWorkerPID, MRef]),
ok = gen_server:call(Ref, {RangeType, FoldWorkerPID, Range}, infinity),
Result = receive_fold_range(MRef, FoldWorkerPID, Fun, Acc0, Limit), Result = receive_fold_range(MRef, FoldWorkerPID, Fun, Acc0, Limit),
?log("fold_range done: self:~p, result=~p~n", [self(), Result]), ?log("fold_range done: self:~p, result=~p~n", [self(), Result]),
Result. Result.
receive_fold_range(MRef,PID,_,Acc0, 0) -> receive_fold_range(MRef,PID,_,Acc0, 0) ->
erlang:exit(PID, shutdown), erlang:exit(PID, shutdown),
drain_worker_and_return(MRef,PID,Acc0); drain_worker(MRef,PID,Acc0);
receive_fold_range(MRef,PID,Fun,Acc0, Limit) -> receive_fold_range(MRef,PID,Fun,Acc0, Limit) ->
?log("receive_fold_range:~p,~P~n", [PID,Acc0,10]), ?log("receive_fold_range:~p,~P~n", [PID,Acc0,10]),
@ -183,7 +183,7 @@ receive_fold_range(MRef,PID,Fun,Acc0, Limit) ->
Exit -> Exit ->
%% kill the fold worker ... %% kill the fold worker ...
erlang:exit(PID, shutdown), erlang:exit(PID, shutdown),
drain_worker_and_throw(MRef,PID,Exit) raise(drain_worker(MRef,PID,Exit))
end; end;
?CAST(_,{fold_limit, PID, _}) -> ?CAST(_,{fold_limit, PID, _}) ->
@ -213,31 +213,11 @@ decr(N) ->
raise({'EXIT', Class, Exception, Trace}) -> raise({'EXIT', Class, Exception, Trace}) ->
erlang:raise(Class, Exception, Trace). erlang:raise(Class, Exception, Trace).
%%
%% When an exception has happened in the fold function, we use
%% this to drain messages coming from the fold_worker before
%% re-throwing the exception.
%%
drain_worker_and_throw(MRef, PID, ExitTuple) ->
receive
?CALL(_From,{fold_result, PID, _, _}) ->
drain_worker_and_throw(MRef, PID, ExitTuple);
{'DOWN', MRef, _, _, _} ->
raise(ExitTuple);
?CAST(_,{fold_limit, PID, _}) ->
erlang:demonitor(MRef, [flush]),
raise(ExitTuple);
?CAST(_,{fold_done, PID}) ->
erlang:demonitor(MRef, [flush]),
raise(ExitTuple)
after 0 ->
raise(ExitTuple)
end.
drain_worker_and_return(MRef, PID, Value) -> drain_worker(MRef, PID, Value) ->
receive receive
?CALL(_From,{fold_result, PID, _, _}) -> ?CALL(_From,{fold_result, PID, _, _}) ->
drain_worker_and_return(MRef, PID, Value); drain_worker(MRef, PID, Value);
{'DOWN', MRef, _, _, _} -> {'DOWN', MRef, _, _, _} ->
Value; Value;
?CAST(_,{fold_limit, PID, _}) -> ?CAST(_,{fold_limit, PID, _}) ->
@ -315,7 +295,6 @@ open_levels(Dir, Options) ->
do_merge(TopLevel, _Inc, N) when N =< 0 -> do_merge(TopLevel, _Inc, N) when N =< 0 ->
ok = hanoidb_level:await_incremental_merge(TopLevel); ok = hanoidb_level:await_incremental_merge(TopLevel);
do_merge(TopLevel, Inc, N) -> do_merge(TopLevel, Inc, N) ->
ok = hanoidb_level:begin_incremental_merge(TopLevel, ?BTREE_SIZE(?TOP_LEVEL)), ok = hanoidb_level:begin_incremental_merge(TopLevel, ?BTREE_SIZE(?TOP_LEVEL)),
do_merge(TopLevel, Inc, N-Inc). do_merge(TopLevel, Inc, N-Inc).