Fix folding

The "blocking range fold" only works for modest
data sets, otherwise it gets prohibitively slow,
so for now we always do "snapshot range fold".
This commit is contained in:
Kresten Krab Thorup 2012-04-23 02:10:18 +02:00
parent ca98b124ff
commit 3b451d5863
4 changed files with 30 additions and 27 deletions

View file

@ -32,7 +32,8 @@
terminate/2, code_change/3]). terminate/2, code_change/3]).
-export([open/1, close/1, get/2, lookup/2, delete/2, put/3, -export([open/1, close/1, get/2, lookup/2, delete/2, put/3,
async_range/2, async_fold_range/4, sync_range/2, sync_fold_range/4]). async_fold/3, async_fold_range/4,
fold/3, fold_range/4]).
-include("hanoi.hrl"). -include("hanoi.hrl").
-include_lib("kernel/include/file.hrl"). -include_lib("kernel/include/file.hrl").
@ -69,15 +70,15 @@ delete(Ref,Key) when is_binary(Key) ->
put(Ref,Key,Value) when is_binary(Key), is_binary(Value) -> put(Ref,Key,Value) when is_binary(Key), is_binary(Value) ->
gen_server:call(Ref, {put, Key, Value}, infinity). gen_server:call(Ref, {put, Key, Value}, infinity).
sync_range(Ref, #btree_range{}=Range) -> fold(Ref,Fun,Acc0) ->
gen_server:call(Ref, {sync_range, self(), Range}, infinity). fold_range(Ref,Fun,Acc0,#btree_range{from_key= <<>>, to_key=undefined}).
sync_fold_range(Ref,Fun,Acc0,Range) -> fold_range(Ref,Fun,Acc0,Range) ->
{ok, PID} = sync_range(Ref, Range), {ok, PID} = gen_server:call(Ref, {snapshot_range, self(), Range}, infinity),
MRef = erlang:monitor(process, PID), MRef = erlang:monitor(process, PID),
sync_receive_fold_range(MRef, PID,Fun,Acc0). receive_fold_range(MRef, PID,Fun,Acc0).
sync_receive_fold_range(MRef, PID,Fun,Acc0) -> receive_fold_range(MRef, PID,Fun,Acc0) ->
receive receive
%% receive one K/V from fold_worker %% receive one K/V from fold_worker
@ -92,7 +93,7 @@ sync_receive_fold_range(MRef, PID,Fun,Acc0) ->
end end
of of
{ok, Acc1} -> {ok, Acc1} ->
sync_receive_fold_range(MRef, PID, Fun, Acc1); receive_fold_range(MRef, PID, Fun, Acc1);
Exit -> Exit ->
%% kill the fold worker ... %% kill the fold worker ...
erlang:exit(PID, kill), erlang:exit(PID, kill),
@ -111,7 +112,7 @@ sync_receive_fold_range(MRef, PID,Fun,Acc0) ->
end end
of of
{ok, Acc1} -> {ok, Acc1} ->
sync_receive_fold_range(MRef, PID, Fun, Acc1); receive_fold_range(MRef, PID, Fun, Acc1);
Exit -> Exit ->
%% kill the fold worker ... %% kill the fold worker ...
erlang:exit(PID, kill), erlang:exit(PID, kill),
@ -152,13 +153,13 @@ drain_worker_and_throw(MRef, PID, ExitTuple) ->
end. end.
async_range(Ref, #btree_range{}=Range) -> async_fold(Ref,Fun,Acc0) ->
gen_server:call(Ref, {async_range, self(), Range}, infinity). async_fold_range(Ref,Fun,Acc0,#btree_range{ from_key= <<>>, to_key=undefined }).
async_fold_range(Ref,Fun,Acc0,Range) -> async_fold_range(Ref,Fun,Acc0,Range) ->
Range2 = Range#btree_range{ limit=?BTREE_ASYNC_CHUNK_SIZE }, Range2 = Range#btree_range{ limit=?BTREE_ASYNC_CHUNK_SIZE },
FoldMoreFun = fun() -> FoldMoreFun = fun() ->
{ok, PID} = gen_server:call(Ref, {sync_range, self(), Range}, infinity), {ok, PID} = gen_server:call(Ref, {snapshot_range, self(), Range}, infinity),
async_receive_fold_range(PID,Fun,Acc0,Ref,Range2) async_receive_fold_range(PID,Fun,Acc0,Ref,Range2)
end, end,
{async, FoldMoreFun}. {async, FoldMoreFun}.
@ -257,16 +258,16 @@ code_change(_OldVsn, State, _Extra) ->
handle_call({async_range, Sender, Range}, _From, State=#state{ top=TopLevel, nursery=Nursery }) -> handle_call({snapshot_range, Sender, Range}, _From, State=#state{ top=TopLevel, nursery=Nursery }) ->
{ok, FoldWorkerPID} = hanoi_fold_worker:start(Sender), {ok, FoldWorkerPID} = hanoi_fold_worker:start(Sender),
hanoi_nursery:do_level_fold(Nursery, FoldWorkerPID, Range), hanoi_nursery:do_level_fold(Nursery, FoldWorkerPID, Range),
Result = hanoi_level:async_range(TopLevel, FoldWorkerPID, Range), Result = hanoi_level:snapshot_range(TopLevel, FoldWorkerPID, Range),
{reply, Result, State}; {reply, Result, State};
handle_call({sync_range, Sender, Range}, _From, State=#state{ top=TopLevel, nursery=Nursery }) -> handle_call({blocking_range, Sender, Range}, _From, State=#state{ top=TopLevel, nursery=Nursery }) ->
{ok, FoldWorkerPID} = hanoi_fold_worker:start(Sender), {ok, FoldWorkerPID} = hanoi_fold_worker:start(Sender),
hanoi_nursery:do_level_fold(Nursery, FoldWorkerPID, Range), hanoi_nursery:do_level_fold(Nursery, FoldWorkerPID, Range),
Result = hanoi_level:sync_range(TopLevel, FoldWorkerPID, Range), Result = hanoi_level:blocking_range(TopLevel, FoldWorkerPID, Range),
{reply, Result, State}; {reply, Result, State};
handle_call({put, Key, Value}, _From, State) when is_binary(Key), is_binary(Value) -> handle_call({put, Key, Value}, _From, State) when is_binary(Key), is_binary(Value) ->

View file

@ -25,8 +25,8 @@
-author('Kresten Krab Thorup <krab@trifork.com>'). -author('Kresten Krab Thorup <krab@trifork.com>').
%% smallest levels are 128 entries %% smallest levels are 256 entries
-define(TOP_LEVEL, 7). -define(TOP_LEVEL, 8).
-define(BTREE_SIZE(Level), (1 bsl (Level))). -define(BTREE_SIZE(Level), (1 bsl (Level))).
-define(TOMBSTONE, 'deleted'). -define(TOMBSTONE, 'deleted').

View file

@ -42,7 +42,7 @@
-behavior(plain_fsm). -behavior(plain_fsm).
-export([data_vsn/0, code_change/3]). -export([data_vsn/0, code_change/3]).
-export([open/3, lookup/2, inject/2, close/1, async_range/3, sync_range/3, incremental_merge/2]). -export([open/3, lookup/2, inject/2, close/1, snapshot_range/3, blocking_range/3, incremental_merge/2]).
-include_lib("kernel/include/file.hrl"). -include_lib("kernel/include/file.hrl").
@ -81,14 +81,14 @@ close(Ref) ->
async_range(Ref, FoldWorkerPID, Range) -> snapshot_range(Ref, FoldWorkerPID, Range) ->
proc_lib:spawn(fun() -> proc_lib:spawn(fun() ->
{ok, Folders} = call(Ref, {init_snapshot_range_fold, FoldWorkerPID, Range, []}), {ok, Folders} = call(Ref, {init_snapshot_range_fold, FoldWorkerPID, Range, []}),
FoldWorkerPID ! {initialize, Folders} FoldWorkerPID ! {initialize, Folders}
end), end),
{ok, FoldWorkerPID}. {ok, FoldWorkerPID}.
sync_range(Ref, FoldWorkerPID, Range) -> blocking_range(Ref, FoldWorkerPID, Range) ->
{ok, Folders} = call(Ref, {init_blocking_range_fold, FoldWorkerPID, Range, []}), {ok, Folders} = call(Ref, {init_blocking_range_fold, FoldWorkerPID, Range, []}),
FoldWorkerPID ! {initialize, Folders}, FoldWorkerPID ! {initialize, Folders},
{ok, FoldWorkerPID}. {ok, FoldWorkerPID}.
@ -381,9 +381,10 @@ main_loop(State = #state{ next=Next }) ->
main_loop(State#state{ folding = FoldingPIDs }); main_loop(State#state{ folding = FoldingPIDs });
{range_fold_done, PID, [_,$F|_]=FoldFileName} -> {range_fold_done, PID, FoldFileName} ->
ok = file:delete(FoldFileName), ok = file:delete(FoldFileName),
main_loop(State#state{ folding = lists:delete(PID,State#state.folding) }); NewFolding = lists:delete(PID,State#state.folding),
main_loop(State#state{ folding = NewFolding });
?REQ(From, {init_blocking_range_fold, WorkerPID, Range, List}) -> ?REQ(From, {init_blocking_range_fold, WorkerPID, Range, List}) ->
@ -602,6 +603,7 @@ start_range_fold(FileName, WorkerPID, Range) ->
{ok, File} = hanoi_reader:open(FileName, sequential), {ok, File} = hanoi_reader:open(FileName, sequential),
do_range_fold(File, WorkerPID, self(), Range), do_range_fold(File, WorkerPID, self(), Range),
erlang:unlink(WorkerPID), erlang:unlink(WorkerPID),
hanoi_reader:close(File),
%% this will release the pinning of the fold file %% this will release the pinning of the fold file
Owner ! {range_fold_done, self(), FileName} Owner ! {range_fold_done, self(), FileName}

View file

@ -170,7 +170,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{tree=Tree}) ->
BucketFolder = BucketFolder =
fun() -> fun() ->
try try
hanoi:sync_fold_range(Tree, FoldFun, {Acc, []}, #btree_range{}) hanoi:fold_range(Tree, FoldFun, {Acc, []}, #btree_range{})
catch catch
{break, AccFinal} -> {break, AccFinal} ->
AccFinal AccFinal
@ -213,7 +213,7 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{tree=Tree}) ->
KeyFolder = KeyFolder =
fun() -> fun() ->
try try
hanoi:sync_fold_range(Tree, FoldFun, Acc, Range) hanoi:fold_range(Tree, FoldFun, Acc, Range)
catch catch
{break, AccFinal} -> {break, AccFinal} ->
AccFinal AccFinal
@ -250,7 +250,7 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{tree=Tree}) ->
ObjectFolder = ObjectFolder =
fun() -> fun() ->
try try
hanoi:sync_fold_range(Tree, FoldFun, Acc, bucket_range(Bucket)) hanoi:fold_range(Tree, FoldFun, Acc, bucket_range(Bucket))
catch catch
{break, AccFinal} -> {break, AccFinal} ->
AccFinal AccFinal
@ -276,7 +276,7 @@ is_empty(#state{tree=Tree}) ->
FoldFun = fun(_K, _V, _Acc) -> throw(ok) end, FoldFun = fun(_K, _V, _Acc) -> throw(ok) end,
try try
Range = #btree_range{}, Range = #btree_range{},
[] =:= hanoi:sync_fold_range(Tree, FoldFun, [], Range) [] =:= hanoi:fold_range(Tree, FoldFun, [], Range)
catch catch
_:ok -> _:ok ->
false false