Allow keys to expire at different points in time as well as due to a global expiry time.
This commit is contained in:
parent
b6987c1e47
commit
db41532258
7 changed files with 114 additions and 106 deletions
|
@ -31,7 +31,7 @@
|
|||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
terminate/2, code_change/3]).
|
||||
|
||||
-export([open/1, open/2, transact/2, close/1, get/2, lookup/2, delete/2, put/3,
|
||||
-export([open/1, open/2, transact/2, close/1, get/2, lookup/2, delete/2, put/3, put/4,
|
||||
fold/3, fold_range/4, destroy/1]).
|
||||
|
||||
-export([get_opt/2, get_opt/3]).
|
||||
|
@ -122,7 +122,14 @@ delete(Ref,Key) when is_binary(Key) ->
|
|||
-spec put(hanoidb(), binary(), binary()) ->
|
||||
ok | {error, term()}.
|
||||
put(Ref,Key,Value) when is_binary(Key), is_binary(Value) ->
|
||||
gen_server:call(Ref, {put, Key, Value}, infinity).
|
||||
gen_server:call(Ref, {put, Key, Value, infinity}, infinity).
|
||||
|
||||
-spec put(hanoidb(), binary(), binary(), integer()) ->
|
||||
ok | {error, term()}.
|
||||
put(Ref,Key,Value,infinity) when is_binary(Key), is_binary(Value) ->
|
||||
gen_server:call(Ref, {put, Key, Value, infinity}, infinity);
|
||||
put(Ref,Key,Value,Expiry) when is_binary(Key), is_binary(Value) ->
|
||||
gen_server:call(Ref, {put, Key, Value, Expiry}, infinity).
|
||||
|
||||
-type transact_spec() :: {put, binary(), binary()} | {delete, binary()}.
|
||||
-spec transact(hanoidb(), [transact_spec()]) ->
|
||||
|
@ -371,8 +378,8 @@ handle_call({blocking_range, FoldWorkerPID, Range}, _From, State=#state{ top=Top
|
|||
Result = hanoidb_level:blocking_range(TopLevel, FoldWorkerPID, Range),
|
||||
{reply, Result, State};
|
||||
|
||||
handle_call({put, Key, Value}, _From, State) when is_binary(Key), is_binary(Value) ->
|
||||
{ok, State2} = do_put(Key, Value, State),
|
||||
handle_call({put, Key, Value, Expiry}, _From, State) when is_binary(Key), is_binary(Value) ->
|
||||
{ok, State2} = do_put(Key, Value, Expiry, State),
|
||||
{reply, ok, State2};
|
||||
|
||||
handle_call({transact, TransactionSpec}, _From, State) ->
|
||||
|
@ -380,7 +387,7 @@ handle_call({transact, TransactionSpec}, _From, State) ->
|
|||
{reply, ok, State2};
|
||||
|
||||
handle_call({delete, Key}, _From, State) when is_binary(Key) ->
|
||||
{ok, State2} = do_put(Key, ?TOMBSTONE, State),
|
||||
{ok, State2} = do_put(Key, ?TOMBSTONE, infinity, State),
|
||||
{reply, ok, State2};
|
||||
|
||||
handle_call({get, Key}, From, State=#state{ top=Top, nursery=Nursery } ) when is_binary(Key) ->
|
||||
|
@ -411,14 +418,14 @@ handle_call(destroy, _From, State=#state{top=Top, nursery=Nursery }) ->
|
|||
{stop, normal, ok, State#state{ top=undefined, nursery=undefined, max_level=?TOP_LEVEL }}.
|
||||
|
||||
|
||||
do_put(Key, Value, State=#state{ nursery=Nursery, top=Top }) ->
|
||||
{ok, Nursery2} = hanoidb_nursery:add_maybe_flush(Key, Value, Nursery, Top),
|
||||
do_put(Key, Value, Expiry, State=#state{ nursery=Nursery, top=Top }) ->
|
||||
{ok, Nursery2} = hanoidb_nursery:add(Key, Value, Expiry, Nursery, Top),
|
||||
{ok, State#state{nursery=Nursery2}}.
|
||||
|
||||
do_transact([{put, Key, Value}], State) ->
|
||||
do_put(Key, Value, State);
|
||||
do_put(Key, Value, infinity, State);
|
||||
do_transact([{delete, Key}], State) ->
|
||||
do_put(Key, ?TOMBSTONE, State);
|
||||
do_put(Key, ?TOMBSTONE, infinity, State);
|
||||
do_transact([], State) ->
|
||||
{ok, State};
|
||||
do_transact(TransactionSpec, State=#state{ nursery=Nursery, top=Top }) ->
|
||||
|
|
|
@ -71,8 +71,7 @@
|
|||
-record(state, {sendto, sendto_ref}).
|
||||
|
||||
start(SendTo) ->
|
||||
PID = plain_fsm:spawn(?MODULE,
|
||||
fun() ->
|
||||
F = fun() ->
|
||||
?log("fold_worker started ~p~n", [self()]),
|
||||
process_flag(trap_exit, true),
|
||||
MRef = erlang:monitor(process, SendTo),
|
||||
|
@ -85,19 +84,17 @@ catch
|
|||
error_logger:error_msg("Unexpected: ~p:~p ~p~n", [Class, Ex, erlang:get_stacktrace()]),
|
||||
exit({bad, Class, Ex, erlang:get_stacktrace()})
|
||||
end
|
||||
end),
|
||||
end,
|
||||
PID = plain_fsm:spawn(?MODULE, F),
|
||||
{ok, PID}.
|
||||
|
||||
|
||||
initialize(State, PrefixFolders) ->
|
||||
|
||||
Parent = plain_fsm:info(parent),
|
||||
receive
|
||||
{prefix, [_]=Folders} ->
|
||||
initialize(State, Folders);
|
||||
|
||||
{initialize, Folders} ->
|
||||
|
||||
Queues = [ {PID,queue:new()} || PID <- (PrefixFolders ++ Folders) ],
|
||||
Initial = [ {PID,undefined} || PID <- (PrefixFolders ++ Folders) ],
|
||||
fill(State, Initial, Queues, PrefixFolders ++ Folders);
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
-module(hanoidb_nursery).
|
||||
-author('Kresten Krab Thorup <krab@trifork.com>').
|
||||
|
||||
-export([new/3, recover/4, finish/2, lookup/2, add_maybe_flush/4]).
|
||||
-export([new/3, recover/4, finish/2, lookup/2, add/4, add/5]).
|
||||
-export([do_level_fold/3, set_max_level/2, transact/3, destroy/1]).
|
||||
|
||||
-include("include/hanoidb.hrl").
|
||||
|
@ -93,39 +93,48 @@ read_nursery_from_log(Directory, MaxLevel, Config) ->
|
|||
Cache = fill_cache(KVs, gb_trees:empty()),
|
||||
{ok, #nursery{ dir=Directory, cache=Cache, count=gb_trees:size(Cache), max_level=MaxLevel, config=Config }}.
|
||||
|
||||
nursery_full(#nursery{count=Count}=Nursery) when Count + 1 > ?BTREE_SIZE(?TOP_LEVEL) ->
|
||||
{full, Nursery};
|
||||
nursery_full(Nursery) ->
|
||||
{ok, Nursery}.
|
||||
|
||||
% @doc
|
||||
% Add a Key/Value to the nursery
|
||||
% @end
|
||||
-spec add(#nursery{}, binary(), binary()|?TOMBSTONE, pid()) -> {ok, #nursery{}}.
|
||||
add(Nursery=#nursery{ log_file=File, cache=Cache, total_size=TotalSize, count=Count, config=Config }, Key, Value, Top) ->
|
||||
ExpiryTime = hanoidb_util:expiry_time(Config),
|
||||
TStamp = hanoidb_util:tstamp(),
|
||||
-spec do_add(#nursery{}, binary(), binary()|?TOMBSTONE, pos_integer() | infinity, pid()) -> {ok, #nursery{}}.
|
||||
do_add(Nursery, Key, Value, infinity, Top) ->
|
||||
do_add(Nursery, Key, Value, 0, Top);
|
||||
do_add(Nursery=#nursery{log_file=File, cache=Cache, total_size=TotalSize, count=Count, config=Config}, Key, Value, KeyExpiryTime, Top) ->
|
||||
DatabaseExpiryTime = hanoidb:get_opt(expiry_secs, Config),
|
||||
|
||||
if ExpiryTime == 0 ->
|
||||
Data = hanoidb_util:crc_encapsulate_kv_entry( Key, Value );
|
||||
{Data, NewValue} = case KeyExpiryTime + DatabaseExpiryTime of
|
||||
0 ->
|
||||
% Both the database expiry and this key's expiry are unset or set to 0
|
||||
% (aka infinity) so never automatically expire the value.
|
||||
{ hanoidb_util:crc_encapsulate_kv_entry(Key, Value), Value };
|
||||
_ ->
|
||||
Expiry = case DatabaseExpiryTime == 0 of
|
||||
true ->
|
||||
Data = hanoidb_util:crc_encapsulate_kv_entry( Key, {Value, TStamp} )
|
||||
% It was the database's setting that was 0 so expire this
|
||||
% value after KeyExpiryTime seconds elapse.
|
||||
hanoidb_util:expiry_time(KeyExpiryTime);
|
||||
false ->
|
||||
case KeyExpiryTime == 0 of
|
||||
true -> hanoidb_util:expiry_time(DatabaseExpiryTime);
|
||||
false -> hanoidb_util:expiry_time(min(KeyExpiryTime, DatabaseExpiryTime))
|
||||
end
|
||||
end,
|
||||
{ hanoidb_util:crc_encapsulate_kv_entry(Key, {Value, Expiry}), {Value, Expiry} }
|
||||
end,
|
||||
|
||||
ok = file:write(File, Data),
|
||||
|
||||
Nursery1 = do_sync(File, Nursery),
|
||||
|
||||
if ExpiryTime == 0 ->
|
||||
Cache2 = gb_trees:enter(Key, Value, Cache);
|
||||
true ->
|
||||
Cache2 = gb_trees:enter(Key, {Value, TStamp}, Cache)
|
||||
end,
|
||||
Cache2 = gb_trees:enter(Key, NewValue, Cache),
|
||||
|
||||
{ok, Nursery2} =
|
||||
do_inc_merge(Nursery1#nursery{ cache=Cache2, total_size=TotalSize+erlang:iolist_size(Data),
|
||||
count=Count+1 }, 1, Top),
|
||||
if
|
||||
Count+1 >= ?BTREE_SIZE(?TOP_LEVEL) ->
|
||||
{full, Nursery2};
|
||||
true ->
|
||||
{ok, Nursery2}
|
||||
end.
|
||||
nursery_full(Nursery2).
|
||||
|
||||
do_sync(File, Nursery) ->
|
||||
case application:get_env(hanoidb, sync_strategy) of
|
||||
|
@ -147,13 +156,13 @@ do_sync(File, Nursery) ->
|
|||
Nursery#nursery{ last_sync = LastSync }.
|
||||
|
||||
|
||||
lookup(Key, #nursery{ cache=Cache, config=Config }) ->
|
||||
lookup(Key, #nursery{cache=Cache}) ->
|
||||
case gb_trees:lookup(Key, Cache) of
|
||||
{value, {Value, TStamp}} ->
|
||||
ExpiryTime = hanoidb_util:expiry_time(Config),
|
||||
if TStamp < ExpiryTime ->
|
||||
none;
|
||||
case hanoidb_util:has_expired(TStamp) of
|
||||
true ->
|
||||
none;
|
||||
false ->
|
||||
{value, Value}
|
||||
end;
|
||||
Reply ->
|
||||
|
@ -231,8 +240,11 @@ destroy(#nursery{ dir=Dir, log_file=LogFile }) ->
|
|||
ok.
|
||||
|
||||
|
||||
add_maybe_flush(Key, Value, Nursery, Top) ->
|
||||
case add(Nursery, Key, Value, Top) of
|
||||
add(Key, Value, Nursery, Top) ->
|
||||
add(Key, Value, infinity, Nursery, Top).
|
||||
|
||||
add(Key, Value, Expiry, Nursery, Top) ->
|
||||
case do_add(Nursery, Key, Value, Expiry, Top) of
|
||||
{ok, _Nursery} = OK ->
|
||||
OK;
|
||||
{full, Nursery2} ->
|
||||
|
@ -285,14 +297,13 @@ do_inc_merge(Nursery=#nursery{ step=Step, merge_done=Done }, N, TopLevel) ->
|
|||
{ok, Nursery#nursery{ step=Step+N }}
|
||||
end.
|
||||
|
||||
do_level_fold(#nursery{ cache=Cache, config=Config }, FoldWorkerPID, KeyRange) ->
|
||||
do_level_fold(#nursery{cache=Cache}, FoldWorkerPID, KeyRange) ->
|
||||
Ref = erlang:make_ref(),
|
||||
FoldWorkerPID ! {prefix, [Ref]},
|
||||
ExpiryTime = hanoidb_util:expiry_time(Config),
|
||||
case lists:foldl(fun(_,{LastKey,limit}) ->
|
||||
{LastKey,limit};
|
||||
({Key,Value}, {LastKey,Count}) ->
|
||||
IsExpired = is_expired(Value, ExpiryTime),
|
||||
IsExpired = is_expired(Value),
|
||||
|
||||
case ?KEY_IN_RANGE(Key,KeyRange) andalso (not IsExpired) of
|
||||
true ->
|
||||
|
@ -330,11 +341,12 @@ decrement(Number) ->
|
|||
|
||||
%%%
|
||||
|
||||
is_expired({_Value, TStamp}, ExpiryTime) ->
|
||||
TStamp < ExpiryTime ;
|
||||
is_expired(?TOMBSTONE,_) ->
|
||||
% TODO this is duplicate code also found in hanoidb_reader
|
||||
is_expired(?TOMBSTONE) ->
|
||||
false;
|
||||
is_expired(Bin,_) when is_binary(Bin) ->
|
||||
is_expired({_Value, TStamp}) ->
|
||||
hanoidb_util:has_expired(TStamp);
|
||||
is_expired(Bin) when is_binary(Bin) ->
|
||||
false.
|
||||
|
||||
get_value({Value, TStamp}) when is_integer(TStamp) ->
|
||||
|
|
|
@ -117,15 +117,14 @@ fold1(File,Fun,Acc0) ->
|
|||
fold0(File,Fun,Node,Acc0)
|
||||
end.
|
||||
|
||||
range_fold(Fun, Acc0, #index{file=File,root=Root,config=Options}, Range) ->
|
||||
ExpiryTime = hanoidb_util:expiry_time(Options),
|
||||
range_fold(Fun, Acc0, #index{file=File,root=Root}, Range) ->
|
||||
case lookup_node(File,Range#key_range.from_key,Root,?FIRST_BLOCK_POS) of
|
||||
{ok, {Pos,_}} ->
|
||||
file:position(File, Pos),
|
||||
do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit, ExpiryTime);
|
||||
do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit);
|
||||
{ok, Pos} ->
|
||||
file:position(File, Pos),
|
||||
do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit, ExpiryTime);
|
||||
do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit);
|
||||
none ->
|
||||
{done, Acc0}
|
||||
end.
|
||||
|
@ -140,11 +139,12 @@ fold_until_stop2(_Fun,{continue, Acc},[]) ->
|
|||
fold_until_stop2(Fun,{continue, Acc},[H|T]) ->
|
||||
fold_until_stop2(Fun,Fun(H,Acc),T).
|
||||
|
||||
is_expired({_Value, TStamp}, ExpiryTime) ->
|
||||
TStamp < ExpiryTime ;
|
||||
is_expired(?TOMBSTONE,_) ->
|
||||
% TODO this is duplicate code also found in hanoidb_nursery
|
||||
is_expired(?TOMBSTONE) ->
|
||||
false;
|
||||
is_expired(Bin,_) when is_binary(Bin) ->
|
||||
is_expired({_Value, TStamp}) ->
|
||||
hanoidb_util:has_expired(TStamp);
|
||||
is_expired(Bin) when is_binary(Bin) ->
|
||||
false.
|
||||
|
||||
get_value({Value, _TStamp}) ->
|
||||
|
@ -152,14 +152,7 @@ get_value({Value, _TStamp}) ->
|
|||
get_value(Value) ->
|
||||
Value.
|
||||
|
||||
get_tstamp({_Value, TStamp}) ->
|
||||
TStamp;
|
||||
get_tstamp(_) ->
|
||||
hanoidb_util:tstamp().
|
||||
|
||||
|
||||
|
||||
do_range_fold(Fun, Acc0, File, Range, undefined, ExpiryTime) ->
|
||||
do_range_fold(Fun, Acc0, File, Range, undefined) ->
|
||||
case next_leaf_node(File) of
|
||||
eof ->
|
||||
{done, Acc0};
|
||||
|
@ -168,7 +161,7 @@ do_range_fold(Fun, Acc0, File, Range, undefined, ExpiryTime) ->
|
|||
case fold_until_stop(fun({Key,_}, Acc) when not ?KEY_IN_TO_RANGE(Key,Range) ->
|
||||
{stop, {done, Acc}};
|
||||
({Key,Value}, Acc) when ?KEY_IN_FROM_RANGE(Key, Range) ->
|
||||
case is_expired(Value, ExpiryTime) of
|
||||
case is_expired(Value) of
|
||||
true ->
|
||||
{continue, Acc};
|
||||
false ->
|
||||
|
@ -181,11 +174,11 @@ do_range_fold(Fun, Acc0, File, Range, undefined, ExpiryTime) ->
|
|||
Members) of
|
||||
{stopped, Result} -> Result;
|
||||
{ok, Acc1} ->
|
||||
do_range_fold(Fun, Acc1, File, Range, undefined, ExpiryTime)
|
||||
do_range_fold(Fun, Acc1, File, Range, undefined)
|
||||
end
|
||||
end;
|
||||
|
||||
do_range_fold(Fun, Acc0, File, Range, N0, ExpiryTime) ->
|
||||
do_range_fold(Fun, Acc0, File, Range, N0) ->
|
||||
case next_leaf_node(File) of
|
||||
eof ->
|
||||
{done, Acc0};
|
||||
|
@ -198,14 +191,14 @@ do_range_fold(Fun, Acc0, File, Range, N0, ExpiryTime) ->
|
|||
({Key,?TOMBSTONE}, {N1,Acc}) when ?KEY_IN_FROM_RANGE(Key,Range) ->
|
||||
{continue, {N1, Fun(Key, ?TOMBSTONE, Acc)}};
|
||||
({Key,{?TOMBSTONE,TStamp}}, {N1,Acc}) when ?KEY_IN_FROM_RANGE(Key,Range) ->
|
||||
case TStamp < ExpiryTime of
|
||||
case hanoidb_utils:has_expired(TStamp) of
|
||||
true ->
|
||||
{continue, {N1,Acc}};
|
||||
false ->
|
||||
{continue, {N1, Fun(Key, ?TOMBSTONE, Acc)}}
|
||||
end;
|
||||
({Key,Value}, {N1,Acc}) when ?KEY_IN_FROM_RANGE(Key,Range) ->
|
||||
case is_expired(Value, ExpiryTime) of
|
||||
case is_expired(Value) of
|
||||
true ->
|
||||
{continue, {N1,Acc}};
|
||||
false ->
|
||||
|
@ -218,7 +211,7 @@ do_range_fold(Fun, Acc0, File, Range, N0, ExpiryTime) ->
|
|||
Members) of
|
||||
{stopped, Result} -> Result;
|
||||
{ok, {N2, Acc1}} ->
|
||||
do_range_fold(Fun, Acc1, File, Range, N2, ExpiryTime)
|
||||
do_range_fold(Fun, Acc1, File, Range, N2)
|
||||
end
|
||||
end.
|
||||
|
||||
|
@ -261,14 +254,14 @@ close(#index{file=File}) ->
|
|||
file:close(File).
|
||||
|
||||
|
||||
lookup(#index{file=File, root=Node, bloom=Bloom, config=Config }, Key) ->
|
||||
lookup(#index{file=File, root=Node, bloom=Bloom}, Key) ->
|
||||
case ebloom:contains(Bloom, Key) of
|
||||
true ->
|
||||
case lookup_in_node(File,Node,Key) of
|
||||
not_found ->
|
||||
not_found;
|
||||
{ok, {Value, TStamp}} ?ASSERT_WHEN(Value =:= ?TOMBSTONE; is_binary(Value)) ->
|
||||
case TStamp < hanoidb_util:expiry_time(Config) of
|
||||
case hanoidb_utils:has_expired(TStamp) of
|
||||
true -> not_found;
|
||||
false -> {ok, Value}
|
||||
end;
|
||||
|
|
|
@ -226,14 +226,14 @@ tstamp() ->
|
|||
{Mega, Sec, _Micro} = os:timestamp(),
|
||||
(Mega * 1000000) + Sec.
|
||||
|
||||
%% @doc Return time when values expire (i.e. Now - ExpirySecs), or 0.
|
||||
-spec expiry_time([_]) -> pos_integer().
|
||||
expiry_time(Opts) ->
|
||||
ExpirySecs = hanoidb:get_opt(expiry_secs, Opts),
|
||||
case ExpirySecs > 0 of
|
||||
true -> tstamp() - ExpirySecs;
|
||||
false -> 0
|
||||
end.
|
||||
%% @doc Return time when values expire (i.e. Now + ExpirySecs), or 0.
|
||||
-spec expiry_time(pos_integer()) -> pos_integer().
|
||||
expiry_time(ExpirySecs) when ExpirySecs > 0 ->
|
||||
tstamp() + ExpirySecs.
|
||||
|
||||
-spec has_expired(pos_integer()) -> true|false.
|
||||
has_expired(Expiration) when Expiration > 0 ->
|
||||
Expiration < tstamp().
|
||||
|
||||
ensure_expiry(Opts) ->
|
||||
case hanoidb:get_opt(expiry_secs, Opts) of
|
||||
|
|
|
@ -57,7 +57,6 @@
|
|||
block_size = ?NODE_SIZE,
|
||||
compress = none :: none | snappy | gzip,
|
||||
opts = [],
|
||||
expiry_time,
|
||||
|
||||
value_count = 0,
|
||||
tombstone_count = 0
|
||||
|
@ -103,8 +102,7 @@ init([Name,Options]) ->
|
|||
bloom = BloomFilter,
|
||||
block_size = BlockSize,
|
||||
compress = hanoidb:get_opt(compress, Options, none),
|
||||
opts = Options,
|
||||
expiry_time = hanoidb_util:expiry_time(Options)
|
||||
opts = Options
|
||||
}};
|
||||
{error, _}=Error ->
|
||||
error_logger:error_msg("hanoidb_writer cannot open ~p: ~p~n", [Name, Error]),
|
||||
|
@ -113,12 +111,13 @@ init([Name,Options]) ->
|
|||
|
||||
|
||||
handle_cast({add, Key, {Data, TStamp}=Record}, State) when is_binary(Key), (is_binary(Data) orelse Data == ?TOMBSTONE)->
|
||||
if TStamp < State#state.expiry_time ->
|
||||
State2 = State;
|
||||
case hanoidb_util:has_expired(TStamp) of
|
||||
true ->
|
||||
{ok, State2} = add_record(0, Key, Record, State)
|
||||
end,
|
||||
{ok, State2} = add_record(0, Key, Record, State),
|
||||
{noreply, State2};
|
||||
false ->
|
||||
{noreply, State}
|
||||
end;
|
||||
|
||||
handle_cast({add, Key, Data}, State) when is_binary(Key), (is_binary(Data) orelse Data == ?TOMBSTONE)->
|
||||
{ok, State2} = add_record(0, Key, Data, State),
|
||||
|
|
|
@ -83,7 +83,7 @@ test(Backend, Volatile, Config, Cleanup) ->
|
|||
test(Backend, Volatile, Config, Cleanup, NumTests) ->
|
||||
triq:check(%triq:numtests(NumTests,
|
||||
prop_backend(Backend, Volatile, Config, Cleanup)
|
||||
%% )
|
||||
% )
|
||||
).
|
||||
|
||||
%% ====================================================================
|
||||
|
|
Loading…
Reference in a new issue