diff --git a/src/hanoi.erl b/src/hanoi.erl index 0c13d79..26486cf 100644 --- a/src/hanoi.erl +++ b/src/hanoi.erl @@ -32,7 +32,6 @@ terminate/2, code_change/3]). -export([open/1, open/2, transact/2, close/1, get/2, lookup/2, delete/2, put/3, - async_fold/3, async_fold_range/4, fold/3, fold_range/4]). -export([get_opt/2, get_opt/3]). @@ -46,6 +45,7 @@ %% PUBLIC API -type hanoi() :: pid(). +-type key_range() :: #btree_range{}. % @doc % Create or open existing hanoi store. Argument `Dir' names a @@ -98,11 +98,19 @@ put(Ref,Key,Value) when is_binary(Key), is_binary(Value) -> transact(Ref, TransactionSpec) -> gen_server:call(Ref, {transact, TransactionSpec}, infinity). +-type kv_fold_fun() :: fun((binary(),binary(),any())->any()). + +-spec fold(hanoi(),kv_fold_fun(),any()) -> any(). fold(Ref,Fun,Acc0) -> fold_range(Ref,Fun,Acc0,#btree_range{from_key= <<>>, to_key=undefined}). +-spec fold_range(hanoi(),kv_fold_fun(),any(),key_range()) -> any(). fold_range(Ref,Fun,Acc0,Range) -> - {ok, PID} = gen_server:call(Ref, {snapshot_range, self(), Range}, infinity), + if Range#btree_range.limit < 10 -> + {ok, PID} = gen_server:call(Ref, {blocking_range, self(), Range}, infinity); + true -> + {ok, PID} = gen_server:call(Ref, {snapshot_range, self(), Range}, infinity) + end, MRef = erlang:monitor(process, PID), receive_fold_range(MRef, PID,Fun,Acc0). @@ -116,16 +124,16 @@ receive_fold_range(MRef, PID,Fun,Acc0) -> {ok, Fun(K,V,Acc0)} catch Class:Exception -> - io:format(user, "Exception in hanoi fold: ~p ~p", [Exception, erlang:get_stacktrace()]), - %% lager:warning("Exception in hanoi fold: ~p", [Exception]), - {'EXIT', Class, Exception, erlang:get_stacktrace()} + % io:format(user, "Exception in hanoi fold: ~p ~p", [Exception, erlang:get_stacktrace()]), + % lager:warn("Exception in hanoi fold: ~p", [Exception]), + {'EXIT', Class, Exception} end of {ok, Acc1} -> receive_fold_range(MRef, PID, Fun, Acc1); Exit -> %% kill the fold worker ... - erlang:exit(PID, kill), + PID ! die, drain_worker_and_throw(MRef,PID,Exit) end; @@ -182,29 +190,6 @@ drain_worker_and_throw(MRef, PID, ExitTuple) -> end. -async_fold(Ref,Fun,Acc0) -> - async_fold_range(Ref,Fun,Acc0,#btree_range{ from_key= <<>>, to_key=undefined }). - -async_fold_range(Ref,Fun,Acc0,Range) -> - Range2 = Range#btree_range{ limit=?BTREE_ASYNC_CHUNK_SIZE }, - FoldMoreFun = fun() -> - {ok, PID} = gen_server:call(Ref, {snapshot_range, self(), Range}, infinity), - async_receive_fold_range(PID,Fun,Acc0,Ref,Range2) - end, - {async, FoldMoreFun}. - -async_receive_fold_range(PID,Fun,Acc0,Ref,Range) -> - receive - {fold_result, PID, K,V} -> - async_receive_fold_range(PID, Fun, Fun(K,V,Acc0), Ref, Range); - {fold_limit, PID, Key} -> - Range2 = Range#btree_range{ from_key = Key, from_inclusive=true }, - async_fold_range(Ref, Fun, Acc0, Range2); - {fold_done, PID} -> - {ok, Acc0} - end. - - init([Dir, Opts]) -> case file:read_file_info(Dir) of {ok, #file_info{ type=directory }} -> diff --git a/src/hanoi_fold_worker.erl b/src/hanoi_fold_worker.erl index 8603c4c..9f4cb52 100644 --- a/src/hanoi_fold_worker.erl +++ b/src/hanoi_fold_worker.erl @@ -25,6 +25,7 @@ -module(hanoi_fold_worker). -author('Kresten Krab Thorup '). +-define(log(Fmt,Args),ok). %% %% This worker is used to merge fold results from individual @@ -77,6 +78,9 @@ initialize(State, PrefixFolders) -> Parent = plain_fsm:info(parent), receive + die -> + ok; + {prefix, [_]=Folders} -> initialize(State, Folders); @@ -92,6 +96,7 @@ initialize(State, PrefixFolders) -> {'EXIT', Parent, Reason} -> plain_fsm:parent_EXIT(Reason, State) + end. @@ -100,11 +105,17 @@ fill(State, Values, []) -> fill(State, Values, [PID|Rest]=PIDs) -> receive + die -> + ok; + {level_done, PID} -> + ?log( "{level_done, ~p}~n", [PID]), fill(State, lists:keydelete(PID, 1, Values), Rest); {level_limit, PID, Key} -> + ?log( "{level_limit, ~p}~n", [PID]), fill(State, lists:keyreplace(PID, 1, Values, {PID,{Key,limit}}), Rest); {level_result, PID, Key, Value} -> + ?log( "{level_result, ~p, ~p, ...}~n", [PID, Key]), fill(State, lists:keyreplace(PID, 1, Values, {PID,{Key,Value}}), Rest); %% gen_fsm handling @@ -124,10 +135,15 @@ fill(State, Values, [PID|Rest]=PIDs) -> end. emit_next(State, []) -> - State#state.sendto ! {fold_done, self()}, + ?log( "emit_next ~p~n", [[]]), + Msg = {fold_done, self()}, + Target = State#state.sendto, + ?log( "~p ! ~p~n", [Target, Msg]), + Target ! Msg, end_of_fold(State); emit_next(State, [{FirstPID,FirstKV}|Rest]=Values) -> + ?log( "emit_next ~p~n", [Values]), case lists:foldl(fun({P,{K1,_}=KV}, {{K2,_},_}) when K1 < K2 -> {KV,[P]}; @@ -142,10 +158,12 @@ emit_next(State, [{FirstPID,FirstKV}|Rest]=Values) -> {{_, ?TOMBSTONE}, FillFrom} -> fill(State, Values, FillFrom); {{Key, limit}, _} -> + ?log( "~p ! ~p~n", [State#state.sendto, {fold_limit, self(), Key}]), State#state.sendto ! {fold_limit, self(), Key}, end_of_fold(State); - {{FoundKey, FoundValue}, FillFrom} -> - State#state.sendto ! {fold_result, self(), FoundKey, FoundValue}, + {{Key, Value}, FillFrom} -> + ?log( "~p ! ~p~n", [State#state.sendto, {fold_result, self(), Key, '...'}]), + State#state.sendto ! {fold_result, self(), Key, Value}, fill(State, Values, FillFrom) end. diff --git a/src/hanoi_nursery.erl b/src/hanoi_nursery.erl index 0de32a1..fe7be2c 100644 --- a/src/hanoi_nursery.erl +++ b/src/hanoi_nursery.erl @@ -225,17 +225,38 @@ transact(Spec, Nursery=#nursery{ log_file=File, cache=Cache0, total_size=TotalSi do_level_fold(#nursery{ cache=Cache }, FoldWorkerPID, KeyRange) -> Ref = erlang:make_ref(), FoldWorkerPID ! {prefix, [Ref]}, - lists:foreach(fun({Key,Value}) -> - case ?KEY_IN_RANGE(Key,KeyRange) of - true -> - FoldWorkerPID ! {level_result, Ref, Key, Value}; - false -> - ok - end - end, - gb_trees:to_list(Cache)), - FoldWorkerPID ! {level_done, Ref}, + case lists:foldl(fun(_,{LastKey,limit}) -> + {LastKey,limit}; + ({Key,Value}, {LastKey,Count}) -> + case ?KEY_IN_RANGE(Key,KeyRange) of + true -> + FoldWorkerPID ! {level_result, Ref, Key, Value}, + case Value of + ?TOMBSTONE -> + {Key, Count}; + _ -> + {Key, decrement(Count)} + end; + false -> + {LastKey, Count} + end + end, + {undefined, KeyRange#btree_range.limit}, + gb_trees:to_list(Cache)) + of + {LastKey, limit} when LastKey =/= undefined -> + FoldWorkerPID ! {level_limit, Ref, LastKey}; + _ -> + FoldWorkerPID ! {level_done, Ref} + end, ok. set_max_level(Nursery = #nursery{}, MaxLevel) -> Nursery#nursery{ max_level = MaxLevel }. + +decrement(undefined) -> + undefined; +decrement(1) -> + limit; +decrement(Number) -> + Number-1. diff --git a/src/riak_kv_hanoi_backend.erl b/src/riak_kv_hanoi_backend.erl index e5d05b6..706668a 100644 --- a/src/riak_kv_hanoi_backend.erl +++ b/src/riak_kv_hanoi_backend.erl @@ -197,7 +197,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{tree=Tree}) -> fun() -> fold_list_buckets(<<>>, Tree, FoldBucketsFun, Acc) end, - case lists:member(async_fold, Opts) of + case proplists:get_bool(async_fold, Opts) of true -> {async, BucketFolder}; false -> @@ -221,8 +221,9 @@ fold_list_buckets(PrevBucket, Tree, FoldBucketsFun, Acc) -> %% grab next bucket, it's a limit=1 range query :-) case hanoi:fold_range(Tree, fun(BucketKey,_Value,none) -> + ?log( "IN_FOLDER ~p~n", [BucketKey]), case from_object_key(BucketKey) of - {o, Bucket, _Key} -> + {Bucket, _Key} -> [Bucket]; _ -> none @@ -232,8 +233,10 @@ fold_list_buckets(PrevBucket, Tree, FoldBucketsFun, Acc) -> Range) of none -> + ?log( "NO_MORE_BUCKETS~n", []), Acc; [Bucket] -> + ?log( "NEXT_BUCKET ~p~n", [Bucket]), fold_list_buckets(Bucket, Tree, FoldBucketsFun, FoldBucketsFun(Bucket, Acc)) end. @@ -259,20 +262,11 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{tree=Tree}) -> %% Set up the fold... FoldFun = fold_keys_fun(FoldKeysFun, Limiter), Range = to_key_range(Limiter), - KeyFolder = - fun() -> - try - hanoi:fold_range(Tree, FoldFun, Acc, Range) - catch - {break, AccFinal} -> - AccFinal - end - end, - case lists:member(async_fold, Opts) of + case proplists:get_bool(async_fold, Opts) of true -> - {async, KeyFolder}; + {async, fun() -> hanoi:fold_range(Tree, FoldFun, Acc, Range) end}; false -> - {ok, KeyFolder()} + {ok, hanoi:fold_range(Tree, FoldFun, Acc, Range)} end. %% @doc Fold over all the objects for one or all buckets. @@ -285,14 +279,9 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{tree=Tree}) -> FoldFun = fold_objects_fun(FoldObjectsFun, Bucket), ObjectFolder = fun() -> - try - hanoi:fold_range(Tree, FoldFun, Acc, to_key_range(Bucket)) - catch - {break, AccFinal} -> - AccFinal - end + hanoi:fold_range(Tree, FoldFun, Acc, to_key_range(Bucket)) end, - case lists:member(async_fold, Opts) of + case proplists:get_bool(async_fold, Opts) of true -> {async, ObjectFolder}; false -> @@ -398,13 +387,12 @@ fold_keys_fun(_FoldKeysFun, Other) -> %% @private %% Return a function to fold over the objects on this backend fold_objects_fun(FoldObjectsFun, FilterBucket) -> - fun(Key, Value, Acc) -> - case from_object_key(Key) of + fun(StorageKey, Value, Acc) -> + ?log( "OFOLD: ~p, filter=~p~n", [sext:decode(StorageKey), FilterBucket]), + case from_object_key(StorageKey) of {Bucket, Key} when FilterBucket == undefined; Bucket == FilterBucket -> - FoldObjectsFun(Bucket, Key, Value, Acc); - _ -> - Acc + FoldObjectsFun(Bucket, Key, Value, Acc) end end.