diff --git a/src/lsm_btree.erl b/src/lsm_btree.erl index 67f6adb..3831a04 100644 --- a/src/lsm_btree.erl +++ b/src/lsm_btree.erl @@ -79,6 +79,8 @@ sync_fold_range(Ref,Fun,Acc0,Range) -> sync_receive_fold_range(MRef, PID,Fun,Acc0) -> receive + + %% receive one K/V from fold_worker {fold_result, PID, K,V} -> case try @@ -96,6 +98,25 @@ sync_receive_fold_range(MRef, PID,Fun,Acc0) -> erlang:exit(PID, kill), drain_worker_and_throw(MRef,PID,Exit) end; + + %% receive multiple KVs from fold_worker + {fold_results, PID, KVs} -> + case + try + {ok, kvfoldl(Fun,Acc0,KVs)} + catch + Class:Exception -> + lager:warning("Exception in lsm_btree fold: ~p", [Exception]), + {'EXIT', Class, Exception, erlang:get_stacktrace()} + end + of + {ok, Acc1} -> + sync_receive_fold_range(MRef, PID, Fun, Acc1); + Exit -> + %% kill the fold worker ... + erlang:exit(PID, kill), + drain_worker_and_throw(MRef,PID,Exit) + end; {fold_limit, PID, _} -> erlang:demonitor(MRef, [flush]), Acc0; @@ -106,6 +127,13 @@ sync_receive_fold_range(MRef, PID,Fun,Acc0) -> error({fold_worker_died, Reason}) end. +kvfoldl(_Fun,Acc0,[]) -> + Acc0; +kvfoldl(Fun,Acc0,[{K,V}|T]) -> + kvfoldl(Fun, Fun(K,V,Acc0), T). + + + raise({'EXIT', Class, Exception, Trace}) -> erlang:raise(Class, Exception, Trace). @@ -113,6 +141,8 @@ drain_worker_and_throw(MRef, PID, ExitTuple) -> receive {fold_result, PID, _, _} -> drain_worker_and_throw(MRef, PID, ExitTuple); + {fold_results, PID, _} -> + drain_worker_and_throw(MRef, PID, ExitTuple); {'DOWN', MRef, _, _, _} -> raise(ExitTuple); {fold_limit, PID, _} ->