Fix folding deleted entries
There was a couple of bugs found by Triq, which exhibited bugs in folding.
This commit is contained in:
parent
70fc4030f6
commit
68114bdbff
4 changed files with 80 additions and 68 deletions
|
@ -32,7 +32,6 @@
|
|||
terminate/2, code_change/3]).
|
||||
|
||||
-export([open/1, open/2, transact/2, close/1, get/2, lookup/2, delete/2, put/3,
|
||||
async_fold/3, async_fold_range/4,
|
||||
fold/3, fold_range/4]).
|
||||
|
||||
-export([get_opt/2, get_opt/3]).
|
||||
|
@ -46,6 +45,7 @@
|
|||
%% PUBLIC API
|
||||
|
||||
-type hanoi() :: pid().
|
||||
-type key_range() :: #btree_range{}.
|
||||
|
||||
% @doc
|
||||
% Create or open existing hanoi store. Argument `Dir' names a
|
||||
|
@ -98,11 +98,19 @@ put(Ref,Key,Value) when is_binary(Key), is_binary(Value) ->
|
|||
transact(Ref, TransactionSpec) ->
|
||||
gen_server:call(Ref, {transact, TransactionSpec}, infinity).
|
||||
|
||||
-type kv_fold_fun() :: fun((binary(),binary(),any())->any()).
|
||||
|
||||
-spec fold(hanoi(),kv_fold_fun(),any()) -> any().
|
||||
fold(Ref,Fun,Acc0) ->
|
||||
fold_range(Ref,Fun,Acc0,#btree_range{from_key= <<>>, to_key=undefined}).
|
||||
|
||||
-spec fold_range(hanoi(),kv_fold_fun(),any(),key_range()) -> any().
|
||||
fold_range(Ref,Fun,Acc0,Range) ->
|
||||
{ok, PID} = gen_server:call(Ref, {snapshot_range, self(), Range}, infinity),
|
||||
if Range#btree_range.limit < 10 ->
|
||||
{ok, PID} = gen_server:call(Ref, {blocking_range, self(), Range}, infinity);
|
||||
true ->
|
||||
{ok, PID} = gen_server:call(Ref, {snapshot_range, self(), Range}, infinity)
|
||||
end,
|
||||
MRef = erlang:monitor(process, PID),
|
||||
receive_fold_range(MRef, PID,Fun,Acc0).
|
||||
|
||||
|
@ -116,16 +124,16 @@ receive_fold_range(MRef, PID,Fun,Acc0) ->
|
|||
{ok, Fun(K,V,Acc0)}
|
||||
catch
|
||||
Class:Exception ->
|
||||
io:format(user, "Exception in hanoi fold: ~p ~p", [Exception, erlang:get_stacktrace()]),
|
||||
%% lager:warning("Exception in hanoi fold: ~p", [Exception]),
|
||||
{'EXIT', Class, Exception, erlang:get_stacktrace()}
|
||||
% io:format(user, "Exception in hanoi fold: ~p ~p", [Exception, erlang:get_stacktrace()]),
|
||||
% lager:warn("Exception in hanoi fold: ~p", [Exception]),
|
||||
{'EXIT', Class, Exception}
|
||||
end
|
||||
of
|
||||
{ok, Acc1} ->
|
||||
receive_fold_range(MRef, PID, Fun, Acc1);
|
||||
Exit ->
|
||||
%% kill the fold worker ...
|
||||
erlang:exit(PID, kill),
|
||||
PID ! die,
|
||||
drain_worker_and_throw(MRef,PID,Exit)
|
||||
end;
|
||||
|
||||
|
@ -182,29 +190,6 @@ drain_worker_and_throw(MRef, PID, ExitTuple) ->
|
|||
end.
|
||||
|
||||
|
||||
async_fold(Ref,Fun,Acc0) ->
|
||||
async_fold_range(Ref,Fun,Acc0,#btree_range{ from_key= <<>>, to_key=undefined }).
|
||||
|
||||
async_fold_range(Ref,Fun,Acc0,Range) ->
|
||||
Range2 = Range#btree_range{ limit=?BTREE_ASYNC_CHUNK_SIZE },
|
||||
FoldMoreFun = fun() ->
|
||||
{ok, PID} = gen_server:call(Ref, {snapshot_range, self(), Range}, infinity),
|
||||
async_receive_fold_range(PID,Fun,Acc0,Ref,Range2)
|
||||
end,
|
||||
{async, FoldMoreFun}.
|
||||
|
||||
async_receive_fold_range(PID,Fun,Acc0,Ref,Range) ->
|
||||
receive
|
||||
{fold_result, PID, K,V} ->
|
||||
async_receive_fold_range(PID, Fun, Fun(K,V,Acc0), Ref, Range);
|
||||
{fold_limit, PID, Key} ->
|
||||
Range2 = Range#btree_range{ from_key = Key, from_inclusive=true },
|
||||
async_fold_range(Ref, Fun, Acc0, Range2);
|
||||
{fold_done, PID} ->
|
||||
{ok, Acc0}
|
||||
end.
|
||||
|
||||
|
||||
init([Dir, Opts]) ->
|
||||
case file:read_file_info(Dir) of
|
||||
{ok, #file_info{ type=directory }} ->
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
-module(hanoi_fold_worker).
|
||||
-author('Kresten Krab Thorup <krab@trifork.com>').
|
||||
|
||||
-define(log(Fmt,Args),ok).
|
||||
|
||||
%%
|
||||
%% This worker is used to merge fold results from individual
|
||||
|
@ -77,6 +78,9 @@ initialize(State, PrefixFolders) ->
|
|||
|
||||
Parent = plain_fsm:info(parent),
|
||||
receive
|
||||
die ->
|
||||
ok;
|
||||
|
||||
{prefix, [_]=Folders} ->
|
||||
initialize(State, Folders);
|
||||
|
||||
|
@ -92,6 +96,7 @@ initialize(State, PrefixFolders) ->
|
|||
|
||||
{'EXIT', Parent, Reason} ->
|
||||
plain_fsm:parent_EXIT(Reason, State)
|
||||
|
||||
end.
|
||||
|
||||
|
||||
|
@ -100,11 +105,17 @@ fill(State, Values, []) ->
|
|||
|
||||
fill(State, Values, [PID|Rest]=PIDs) ->
|
||||
receive
|
||||
die ->
|
||||
ok;
|
||||
|
||||
{level_done, PID} ->
|
||||
?log( "{level_done, ~p}~n", [PID]),
|
||||
fill(State, lists:keydelete(PID, 1, Values), Rest);
|
||||
{level_limit, PID, Key} ->
|
||||
?log( "{level_limit, ~p}~n", [PID]),
|
||||
fill(State, lists:keyreplace(PID, 1, Values, {PID,{Key,limit}}), Rest);
|
||||
{level_result, PID, Key, Value} ->
|
||||
?log( "{level_result, ~p, ~p, ...}~n", [PID, Key]),
|
||||
fill(State, lists:keyreplace(PID, 1, Values, {PID,{Key,Value}}), Rest);
|
||||
|
||||
%% gen_fsm handling
|
||||
|
@ -124,10 +135,15 @@ fill(State, Values, [PID|Rest]=PIDs) ->
|
|||
end.
|
||||
|
||||
emit_next(State, []) ->
|
||||
State#state.sendto ! {fold_done, self()},
|
||||
?log( "emit_next ~p~n", [[]]),
|
||||
Msg = {fold_done, self()},
|
||||
Target = State#state.sendto,
|
||||
?log( "~p ! ~p~n", [Target, Msg]),
|
||||
Target ! Msg,
|
||||
end_of_fold(State);
|
||||
|
||||
emit_next(State, [{FirstPID,FirstKV}|Rest]=Values) ->
|
||||
?log( "emit_next ~p~n", [Values]),
|
||||
case
|
||||
lists:foldl(fun({P,{K1,_}=KV}, {{K2,_},_}) when K1 < K2 ->
|
||||
{KV,[P]};
|
||||
|
@ -142,10 +158,12 @@ emit_next(State, [{FirstPID,FirstKV}|Rest]=Values) ->
|
|||
{{_, ?TOMBSTONE}, FillFrom} ->
|
||||
fill(State, Values, FillFrom);
|
||||
{{Key, limit}, _} ->
|
||||
?log( "~p ! ~p~n", [State#state.sendto, {fold_limit, self(), Key}]),
|
||||
State#state.sendto ! {fold_limit, self(), Key},
|
||||
end_of_fold(State);
|
||||
{{FoundKey, FoundValue}, FillFrom} ->
|
||||
State#state.sendto ! {fold_result, self(), FoundKey, FoundValue},
|
||||
{{Key, Value}, FillFrom} ->
|
||||
?log( "~p ! ~p~n", [State#state.sendto, {fold_result, self(), Key, '...'}]),
|
||||
State#state.sendto ! {fold_result, self(), Key, Value},
|
||||
fill(State, Values, FillFrom)
|
||||
end.
|
||||
|
||||
|
|
|
@ -225,17 +225,38 @@ transact(Spec, Nursery=#nursery{ log_file=File, cache=Cache0, total_size=TotalSi
|
|||
do_level_fold(#nursery{ cache=Cache }, FoldWorkerPID, KeyRange) ->
|
||||
Ref = erlang:make_ref(),
|
||||
FoldWorkerPID ! {prefix, [Ref]},
|
||||
lists:foreach(fun({Key,Value}) ->
|
||||
case ?KEY_IN_RANGE(Key,KeyRange) of
|
||||
true ->
|
||||
FoldWorkerPID ! {level_result, Ref, Key, Value};
|
||||
false ->
|
||||
ok
|
||||
end
|
||||
end,
|
||||
gb_trees:to_list(Cache)),
|
||||
FoldWorkerPID ! {level_done, Ref},
|
||||
case lists:foldl(fun(_,{LastKey,limit}) ->
|
||||
{LastKey,limit};
|
||||
({Key,Value}, {LastKey,Count}) ->
|
||||
case ?KEY_IN_RANGE(Key,KeyRange) of
|
||||
true ->
|
||||
FoldWorkerPID ! {level_result, Ref, Key, Value},
|
||||
case Value of
|
||||
?TOMBSTONE ->
|
||||
{Key, Count};
|
||||
_ ->
|
||||
{Key, decrement(Count)}
|
||||
end;
|
||||
false ->
|
||||
{LastKey, Count}
|
||||
end
|
||||
end,
|
||||
{undefined, KeyRange#btree_range.limit},
|
||||
gb_trees:to_list(Cache))
|
||||
of
|
||||
{LastKey, limit} when LastKey =/= undefined ->
|
||||
FoldWorkerPID ! {level_limit, Ref, LastKey};
|
||||
_ ->
|
||||
FoldWorkerPID ! {level_done, Ref}
|
||||
end,
|
||||
ok.
|
||||
|
||||
set_max_level(Nursery = #nursery{}, MaxLevel) ->
|
||||
Nursery#nursery{ max_level = MaxLevel }.
|
||||
|
||||
decrement(undefined) ->
|
||||
undefined;
|
||||
decrement(1) ->
|
||||
limit;
|
||||
decrement(Number) ->
|
||||
Number-1.
|
||||
|
|
|
@ -197,7 +197,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{tree=Tree}) ->
|
|||
fun() ->
|
||||
fold_list_buckets(<<>>, Tree, FoldBucketsFun, Acc)
|
||||
end,
|
||||
case lists:member(async_fold, Opts) of
|
||||
case proplists:get_bool(async_fold, Opts) of
|
||||
true ->
|
||||
{async, BucketFolder};
|
||||
false ->
|
||||
|
@ -221,8 +221,9 @@ fold_list_buckets(PrevBucket, Tree, FoldBucketsFun, Acc) ->
|
|||
%% grab next bucket, it's a limit=1 range query :-)
|
||||
case hanoi:fold_range(Tree,
|
||||
fun(BucketKey,_Value,none) ->
|
||||
?log( "IN_FOLDER ~p~n", [BucketKey]),
|
||||
case from_object_key(BucketKey) of
|
||||
{o, Bucket, _Key} ->
|
||||
{Bucket, _Key} ->
|
||||
[Bucket];
|
||||
_ ->
|
||||
none
|
||||
|
@ -232,8 +233,10 @@ fold_list_buckets(PrevBucket, Tree, FoldBucketsFun, Acc) ->
|
|||
Range)
|
||||
of
|
||||
none ->
|
||||
?log( "NO_MORE_BUCKETS~n", []),
|
||||
Acc;
|
||||
[Bucket] ->
|
||||
?log( "NEXT_BUCKET ~p~n", [Bucket]),
|
||||
fold_list_buckets(Bucket, Tree, FoldBucketsFun, FoldBucketsFun(Bucket, Acc))
|
||||
end.
|
||||
|
||||
|
@ -259,20 +262,11 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{tree=Tree}) ->
|
|||
%% Set up the fold...
|
||||
FoldFun = fold_keys_fun(FoldKeysFun, Limiter),
|
||||
Range = to_key_range(Limiter),
|
||||
KeyFolder =
|
||||
fun() ->
|
||||
try
|
||||
hanoi:fold_range(Tree, FoldFun, Acc, Range)
|
||||
catch
|
||||
{break, AccFinal} ->
|
||||
AccFinal
|
||||
end
|
||||
end,
|
||||
case lists:member(async_fold, Opts) of
|
||||
case proplists:get_bool(async_fold, Opts) of
|
||||
true ->
|
||||
{async, KeyFolder};
|
||||
{async, fun() -> hanoi:fold_range(Tree, FoldFun, Acc, Range) end};
|
||||
false ->
|
||||
{ok, KeyFolder()}
|
||||
{ok, hanoi:fold_range(Tree, FoldFun, Acc, Range)}
|
||||
end.
|
||||
|
||||
%% @doc Fold over all the objects for one or all buckets.
|
||||
|
@ -285,14 +279,9 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{tree=Tree}) ->
|
|||
FoldFun = fold_objects_fun(FoldObjectsFun, Bucket),
|
||||
ObjectFolder =
|
||||
fun() ->
|
||||
try
|
||||
hanoi:fold_range(Tree, FoldFun, Acc, to_key_range(Bucket))
|
||||
catch
|
||||
{break, AccFinal} ->
|
||||
AccFinal
|
||||
end
|
||||
hanoi:fold_range(Tree, FoldFun, Acc, to_key_range(Bucket))
|
||||
end,
|
||||
case lists:member(async_fold, Opts) of
|
||||
case proplists:get_bool(async_fold, Opts) of
|
||||
true ->
|
||||
{async, ObjectFolder};
|
||||
false ->
|
||||
|
@ -398,13 +387,12 @@ fold_keys_fun(_FoldKeysFun, Other) ->
|
|||
%% @private
|
||||
%% Return a function to fold over the objects on this backend
|
||||
fold_objects_fun(FoldObjectsFun, FilterBucket) ->
|
||||
fun(Key, Value, Acc) ->
|
||||
case from_object_key(Key) of
|
||||
fun(StorageKey, Value, Acc) ->
|
||||
?log( "OFOLD: ~p, filter=~p~n", [sext:decode(StorageKey), FilterBucket]),
|
||||
case from_object_key(StorageKey) of
|
||||
{Bucket, Key} when FilterBucket == undefined;
|
||||
Bucket == FilterBucket ->
|
||||
FoldObjectsFun(Bucket, Key, Value, Acc);
|
||||
_ ->
|
||||
Acc
|
||||
FoldObjectsFun(Bucket, Key, Value, Acc)
|
||||
end
|
||||
end.
|
||||
|
||||
|
|
Loading…
Reference in a new issue