Allow fold worker to send {fold_results, PID, KVs}
Not just individual KVs, but lists of KVs
This commit is contained in:
parent
b70d2af1da
commit
4e53b0a083
1 changed files with 30 additions and 0 deletions
|
@ -79,6 +79,8 @@ sync_fold_range(Ref,Fun,Acc0,Range) ->
|
||||||
|
|
||||||
sync_receive_fold_range(MRef, PID,Fun,Acc0) ->
|
sync_receive_fold_range(MRef, PID,Fun,Acc0) ->
|
||||||
receive
|
receive
|
||||||
|
|
||||||
|
%% receive one K/V from fold_worker
|
||||||
{fold_result, PID, K,V} ->
|
{fold_result, PID, K,V} ->
|
||||||
case
|
case
|
||||||
try
|
try
|
||||||
|
@ -96,6 +98,25 @@ sync_receive_fold_range(MRef, PID,Fun,Acc0) ->
|
||||||
erlang:exit(PID, kill),
|
erlang:exit(PID, kill),
|
||||||
drain_worker_and_throw(MRef,PID,Exit)
|
drain_worker_and_throw(MRef,PID,Exit)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
%% receive multiple KVs from fold_worker
|
||||||
|
{fold_results, PID, KVs} ->
|
||||||
|
case
|
||||||
|
try
|
||||||
|
{ok, kvfoldl(Fun,Acc0,KVs)}
|
||||||
|
catch
|
||||||
|
Class:Exception ->
|
||||||
|
lager:warning("Exception in lsm_btree fold: ~p", [Exception]),
|
||||||
|
{'EXIT', Class, Exception, erlang:get_stacktrace()}
|
||||||
|
end
|
||||||
|
of
|
||||||
|
{ok, Acc1} ->
|
||||||
|
sync_receive_fold_range(MRef, PID, Fun, Acc1);
|
||||||
|
Exit ->
|
||||||
|
%% kill the fold worker ...
|
||||||
|
erlang:exit(PID, kill),
|
||||||
|
drain_worker_and_throw(MRef,PID,Exit)
|
||||||
|
end;
|
||||||
{fold_limit, PID, _} ->
|
{fold_limit, PID, _} ->
|
||||||
erlang:demonitor(MRef, [flush]),
|
erlang:demonitor(MRef, [flush]),
|
||||||
Acc0;
|
Acc0;
|
||||||
|
@ -106,6 +127,13 @@ sync_receive_fold_range(MRef, PID,Fun,Acc0) ->
|
||||||
error({fold_worker_died, Reason})
|
error({fold_worker_died, Reason})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
kvfoldl(_Fun,Acc0,[]) ->
|
||||||
|
Acc0;
|
||||||
|
kvfoldl(Fun,Acc0,[{K,V}|T]) ->
|
||||||
|
kvfoldl(Fun, Fun(K,V,Acc0), T).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
raise({'EXIT', Class, Exception, Trace}) ->
|
raise({'EXIT', Class, Exception, Trace}) ->
|
||||||
erlang:raise(Class, Exception, Trace).
|
erlang:raise(Class, Exception, Trace).
|
||||||
|
|
||||||
|
@ -113,6 +141,8 @@ drain_worker_and_throw(MRef, PID, ExitTuple) ->
|
||||||
receive
|
receive
|
||||||
{fold_result, PID, _, _} ->
|
{fold_result, PID, _, _} ->
|
||||||
drain_worker_and_throw(MRef, PID, ExitTuple);
|
drain_worker_and_throw(MRef, PID, ExitTuple);
|
||||||
|
{fold_results, PID, _} ->
|
||||||
|
drain_worker_and_throw(MRef, PID, ExitTuple);
|
||||||
{'DOWN', MRef, _, _, _} ->
|
{'DOWN', MRef, _, _, _} ->
|
||||||
raise(ExitTuple);
|
raise(ExitTuple);
|
||||||
{fold_limit, PID, _} ->
|
{fold_limit, PID, _} ->
|
||||||
|
|
Loading…
Reference in a new issue