diff --git a/src/hanoi.erl b/src/hanoi.erl index 70ab835..f020879 100644 --- a/src/hanoi.erl +++ b/src/hanoi.erl @@ -32,7 +32,8 @@ terminate/2, code_change/3]). -export([open/1, close/1, get/2, lookup/2, delete/2, put/3, - async_range/2, async_fold_range/4, sync_range/2, sync_fold_range/4]). + async_fold/3, async_fold_range/4, + fold/3, fold_range/4]). -include("hanoi.hrl"). -include_lib("kernel/include/file.hrl"). @@ -69,15 +70,15 @@ delete(Ref,Key) when is_binary(Key) -> put(Ref,Key,Value) when is_binary(Key), is_binary(Value) -> gen_server:call(Ref, {put, Key, Value}, infinity). -sync_range(Ref, #btree_range{}=Range) -> - gen_server:call(Ref, {sync_range, self(), Range}, infinity). +fold(Ref,Fun,Acc0) -> + fold_range(Ref,Fun,Acc0,#btree_range{from_key= <<>>, to_key=undefined}). -sync_fold_range(Ref,Fun,Acc0,Range) -> - {ok, PID} = sync_range(Ref, Range), +fold_range(Ref,Fun,Acc0,Range) -> + {ok, PID} = gen_server:call(Ref, {snapshot_range, self(), Range}, infinity), MRef = erlang:monitor(process, PID), - sync_receive_fold_range(MRef, PID,Fun,Acc0). + receive_fold_range(MRef, PID,Fun,Acc0). -sync_receive_fold_range(MRef, PID,Fun,Acc0) -> +receive_fold_range(MRef, PID,Fun,Acc0) -> receive %% receive one K/V from fold_worker @@ -92,7 +93,7 @@ sync_receive_fold_range(MRef, PID,Fun,Acc0) -> end of {ok, Acc1} -> - sync_receive_fold_range(MRef, PID, Fun, Acc1); + receive_fold_range(MRef, PID, Fun, Acc1); Exit -> %% kill the fold worker ... erlang:exit(PID, kill), @@ -111,7 +112,7 @@ sync_receive_fold_range(MRef, PID,Fun,Acc0) -> end of {ok, Acc1} -> - sync_receive_fold_range(MRef, PID, Fun, Acc1); + receive_fold_range(MRef, PID, Fun, Acc1); Exit -> %% kill the fold worker ... erlang:exit(PID, kill), @@ -152,13 +153,13 @@ drain_worker_and_throw(MRef, PID, ExitTuple) -> end. -async_range(Ref, #btree_range{}=Range) -> - gen_server:call(Ref, {async_range, self(), Range}, infinity). +async_fold(Ref,Fun,Acc0) -> + async_fold_range(Ref,Fun,Acc0,#btree_range{ from_key= <<>>, to_key=undefined }). async_fold_range(Ref,Fun,Acc0,Range) -> Range2 = Range#btree_range{ limit=?BTREE_ASYNC_CHUNK_SIZE }, FoldMoreFun = fun() -> - {ok, PID} = gen_server:call(Ref, {sync_range, self(), Range}, infinity), + {ok, PID} = gen_server:call(Ref, {snapshot_range, self(), Range}, infinity), async_receive_fold_range(PID,Fun,Acc0,Ref,Range2) end, {async, FoldMoreFun}. @@ -257,16 +258,16 @@ code_change(_OldVsn, State, _Extra) -> -handle_call({async_range, Sender, Range}, _From, State=#state{ top=TopLevel, nursery=Nursery }) -> +handle_call({snapshot_range, Sender, Range}, _From, State=#state{ top=TopLevel, nursery=Nursery }) -> {ok, FoldWorkerPID} = hanoi_fold_worker:start(Sender), hanoi_nursery:do_level_fold(Nursery, FoldWorkerPID, Range), - Result = hanoi_level:async_range(TopLevel, FoldWorkerPID, Range), + Result = hanoi_level:snapshot_range(TopLevel, FoldWorkerPID, Range), {reply, Result, State}; -handle_call({sync_range, Sender, Range}, _From, State=#state{ top=TopLevel, nursery=Nursery }) -> +handle_call({blocking_range, Sender, Range}, _From, State=#state{ top=TopLevel, nursery=Nursery }) -> {ok, FoldWorkerPID} = hanoi_fold_worker:start(Sender), hanoi_nursery:do_level_fold(Nursery, FoldWorkerPID, Range), - Result = hanoi_level:sync_range(TopLevel, FoldWorkerPID, Range), + Result = hanoi_level:blocking_range(TopLevel, FoldWorkerPID, Range), {reply, Result, State}; handle_call({put, Key, Value}, _From, State) when is_binary(Key), is_binary(Value) -> diff --git a/src/hanoi.hrl b/src/hanoi.hrl index b247996..3cc0ce0 100644 --- a/src/hanoi.hrl +++ b/src/hanoi.hrl @@ -25,8 +25,8 @@ -author('Kresten Krab Thorup '). -%% smallest levels are 128 entries --define(TOP_LEVEL, 7). +%% smallest levels are 256 entries +-define(TOP_LEVEL, 8). -define(BTREE_SIZE(Level), (1 bsl (Level))). -define(TOMBSTONE, 'deleted'). diff --git a/src/hanoi_level.erl b/src/hanoi_level.erl index b214555..448e645 100644 --- a/src/hanoi_level.erl +++ b/src/hanoi_level.erl @@ -42,7 +42,7 @@ -behavior(plain_fsm). -export([data_vsn/0, code_change/3]). --export([open/3, lookup/2, inject/2, close/1, async_range/3, sync_range/3, incremental_merge/2]). +-export([open/3, lookup/2, inject/2, close/1, snapshot_range/3, blocking_range/3, incremental_merge/2]). -include_lib("kernel/include/file.hrl"). @@ -81,14 +81,14 @@ close(Ref) -> -async_range(Ref, FoldWorkerPID, Range) -> +snapshot_range(Ref, FoldWorkerPID, Range) -> proc_lib:spawn(fun() -> {ok, Folders} = call(Ref, {init_snapshot_range_fold, FoldWorkerPID, Range, []}), FoldWorkerPID ! {initialize, Folders} end), {ok, FoldWorkerPID}. -sync_range(Ref, FoldWorkerPID, Range) -> +blocking_range(Ref, FoldWorkerPID, Range) -> {ok, Folders} = call(Ref, {init_blocking_range_fold, FoldWorkerPID, Range, []}), FoldWorkerPID ! {initialize, Folders}, {ok, FoldWorkerPID}. @@ -381,9 +381,10 @@ main_loop(State = #state{ next=Next }) -> main_loop(State#state{ folding = FoldingPIDs }); - {range_fold_done, PID, [_,$F|_]=FoldFileName} -> + {range_fold_done, PID, FoldFileName} -> ok = file:delete(FoldFileName), - main_loop(State#state{ folding = lists:delete(PID,State#state.folding) }); + NewFolding = lists:delete(PID,State#state.folding), + main_loop(State#state{ folding = NewFolding }); ?REQ(From, {init_blocking_range_fold, WorkerPID, Range, List}) -> @@ -602,6 +603,7 @@ start_range_fold(FileName, WorkerPID, Range) -> {ok, File} = hanoi_reader:open(FileName, sequential), do_range_fold(File, WorkerPID, self(), Range), erlang:unlink(WorkerPID), + hanoi_reader:close(File), %% this will release the pinning of the fold file Owner ! {range_fold_done, self(), FileName} diff --git a/src/riak_kv_hanoi_backend.erl b/src/riak_kv_hanoi_backend.erl index 5ceab7e..1578d65 100644 --- a/src/riak_kv_hanoi_backend.erl +++ b/src/riak_kv_hanoi_backend.erl @@ -170,7 +170,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{tree=Tree}) -> BucketFolder = fun() -> try - hanoi:sync_fold_range(Tree, FoldFun, {Acc, []}, #btree_range{}) + hanoi:fold_range(Tree, FoldFun, {Acc, []}, #btree_range{}) catch {break, AccFinal} -> AccFinal @@ -213,7 +213,7 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{tree=Tree}) -> KeyFolder = fun() -> try - hanoi:sync_fold_range(Tree, FoldFun, Acc, Range) + hanoi:fold_range(Tree, FoldFun, Acc, Range) catch {break, AccFinal} -> AccFinal @@ -250,7 +250,7 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{tree=Tree}) -> ObjectFolder = fun() -> try - hanoi:sync_fold_range(Tree, FoldFun, Acc, bucket_range(Bucket)) + hanoi:fold_range(Tree, FoldFun, Acc, bucket_range(Bucket)) catch {break, AccFinal} -> AccFinal @@ -276,7 +276,7 @@ is_empty(#state{tree=Tree}) -> FoldFun = fun(_K, _V, _Acc) -> throw(ok) end, try Range = #btree_range{}, - [] =:= hanoi:sync_fold_range(Tree, FoldFun, [], Range) + [] =:= hanoi:fold_range(Tree, FoldFun, [], Range) catch _:ok -> false