From db415322583c397a08dcae4629ecee6bd7f6c79a Mon Sep 17 00:00:00 2001 From: Gregory Burd Date: Thu, 14 Jun 2012 18:11:41 +0100 Subject: [PATCH] Allow keys to expire at different points in time as well as due to a global expiry time. --- src/hanoidb.erl | 27 +++++++----- src/hanoidb_fold_worker.erl | 33 +++++++-------- src/hanoidb_nursery.erl | 84 +++++++++++++++++++++---------------- src/hanoidb_reader.erl | 41 ++++++++---------- src/hanoidb_util.erl | 16 +++---- src/hanoidb_writer.erl | 17 ++++---- test/backend_eqc.erl | 2 +- 7 files changed, 114 insertions(+), 106 deletions(-) diff --git a/src/hanoidb.erl b/src/hanoidb.erl index f288565..ff2c4e5 100644 --- a/src/hanoidb.erl +++ b/src/hanoidb.erl @@ -31,7 +31,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([open/1, open/2, transact/2, close/1, get/2, lookup/2, delete/2, put/3, +-export([open/1, open/2, transact/2, close/1, get/2, lookup/2, delete/2, put/3, put/4, fold/3, fold_range/4, destroy/1]). -export([get_opt/2, get_opt/3]). @@ -122,7 +122,14 @@ delete(Ref,Key) when is_binary(Key) -> -spec put(hanoidb(), binary(), binary()) -> ok | {error, term()}. put(Ref,Key,Value) when is_binary(Key), is_binary(Value) -> - gen_server:call(Ref, {put, Key, Value}, infinity). + gen_server:call(Ref, {put, Key, Value, infinity}, infinity). + +-spec put(hanoidb(), binary(), binary(), integer()) -> + ok | {error, term()}. +put(Ref,Key,Value,infinity) when is_binary(Key), is_binary(Value) -> + gen_server:call(Ref, {put, Key, Value, infinity}, infinity); +put(Ref,Key,Value,Expiry) when is_binary(Key), is_binary(Value) -> + gen_server:call(Ref, {put, Key, Value, Expiry}, infinity). -type transact_spec() :: {put, binary(), binary()} | {delete, binary()}. -spec transact(hanoidb(), [transact_spec()]) -> @@ -371,8 +378,8 @@ handle_call({blocking_range, FoldWorkerPID, Range}, _From, State=#state{ top=Top Result = hanoidb_level:blocking_range(TopLevel, FoldWorkerPID, Range), {reply, Result, State}; -handle_call({put, Key, Value}, _From, State) when is_binary(Key), is_binary(Value) -> - {ok, State2} = do_put(Key, Value, State), +handle_call({put, Key, Value, Expiry}, _From, State) when is_binary(Key), is_binary(Value) -> + {ok, State2} = do_put(Key, Value, Expiry, State), {reply, ok, State2}; handle_call({transact, TransactionSpec}, _From, State) -> @@ -380,7 +387,7 @@ handle_call({transact, TransactionSpec}, _From, State) -> {reply, ok, State2}; handle_call({delete, Key}, _From, State) when is_binary(Key) -> - {ok, State2} = do_put(Key, ?TOMBSTONE, State), + {ok, State2} = do_put(Key, ?TOMBSTONE, infinity, State), {reply, ok, State2}; handle_call({get, Key}, From, State=#state{ top=Top, nursery=Nursery } ) when is_binary(Key) -> @@ -411,14 +418,14 @@ handle_call(destroy, _From, State=#state{top=Top, nursery=Nursery }) -> {stop, normal, ok, State#state{ top=undefined, nursery=undefined, max_level=?TOP_LEVEL }}. -do_put(Key, Value, State=#state{ nursery=Nursery, top=Top }) -> - {ok, Nursery2} = hanoidb_nursery:add_maybe_flush(Key, Value, Nursery, Top), - {ok, State#state{ nursery=Nursery2 }}. +do_put(Key, Value, Expiry, State=#state{ nursery=Nursery, top=Top }) -> + {ok, Nursery2} = hanoidb_nursery:add(Key, Value, Expiry, Nursery, Top), + {ok, State#state{nursery=Nursery2}}. do_transact([{put, Key, Value}], State) -> - do_put(Key, Value, State); + do_put(Key, Value, infinity, State); do_transact([{delete, Key}], State) -> - do_put(Key, ?TOMBSTONE, State); + do_put(Key, ?TOMBSTONE, infinity, State); do_transact([], State) -> {ok, State}; do_transact(TransactionSpec, State=#state{ nursery=Nursery, top=Top }) -> diff --git a/src/hanoidb_fold_worker.erl b/src/hanoidb_fold_worker.erl index 094bbca..001a985 100644 --- a/src/hanoidb_fold_worker.erl +++ b/src/hanoidb_fold_worker.erl @@ -71,33 +71,30 @@ -record(state, {sendto, sendto_ref}). start(SendTo) -> - PID = plain_fsm:spawn(?MODULE, - fun() -> - ?log("fold_worker started ~p~n", [self()]), - process_flag(trap_exit,true), - MRef = erlang:monitor(process, SendTo), -try - initialize(#state{sendto=SendTo, sendto_ref=MRef}, []), - ?log("fold_worker done ~p~n", [self()]) -catch - Class:Ex -> - ?log("fold_worker exception ~p:~p ~p~n", [Class, Ex, erlang:get_stacktrace()]), - error_logger:error_msg("Unexpected: ~p:~p ~p~n", [Class, Ex, erlang:get_stacktrace()]), - exit({bad, Class, Ex, erlang:get_stacktrace()}) -end - end), + F = fun() -> + ?log("fold_worker started ~p~n", [self()]), + process_flag(trap_exit, true), + MRef = erlang:monitor(process, SendTo), + try + initialize(#state{sendto=SendTo, sendto_ref=MRef}, []), + ?log("fold_worker done ~p~n", [self()]) + catch + Class:Ex -> + ?log("fold_worker exception ~p:~p ~p~n", [Class, Ex, erlang:get_stacktrace()]), + error_logger:error_msg("Unexpected: ~p:~p ~p~n", [Class, Ex, erlang:get_stacktrace()]), + exit({bad, Class, Ex, erlang:get_stacktrace()}) + end + end, + PID = plain_fsm:spawn(?MODULE, F), {ok, PID}. - initialize(State, PrefixFolders) -> - Parent = plain_fsm:info(parent), receive {prefix, [_]=Folders} -> initialize(State, Folders); {initialize, Folders} -> - Queues = [ {PID,queue:new()} || PID <- (PrefixFolders ++ Folders) ], Initial = [ {PID,undefined} || PID <- (PrefixFolders ++ Folders) ], fill(State, Initial, Queues, PrefixFolders ++ Folders); diff --git a/src/hanoidb_nursery.erl b/src/hanoidb_nursery.erl index cb3ae17..8d2edeb 100644 --- a/src/hanoidb_nursery.erl +++ b/src/hanoidb_nursery.erl @@ -25,7 +25,7 @@ -module(hanoidb_nursery). -author('Kresten Krab Thorup '). --export([new/3, recover/4, finish/2, lookup/2, add_maybe_flush/4]). +-export([new/3, recover/4, finish/2, lookup/2, add/4, add/5]). -export([do_level_fold/3, set_max_level/2, transact/3, destroy/1]). -include("include/hanoidb.hrl"). @@ -93,39 +93,48 @@ read_nursery_from_log(Directory, MaxLevel, Config) -> Cache = fill_cache(KVs, gb_trees:empty()), {ok, #nursery{ dir=Directory, cache=Cache, count=gb_trees:size(Cache), max_level=MaxLevel, config=Config }}. +nursery_full(#nursery{count=Count}=Nursery) when Count + 1 > ?BTREE_SIZE(?TOP_LEVEL) -> + {full, Nursery}; +nursery_full(Nursery) -> + {ok, Nursery}. % @doc % Add a Key/Value to the nursery % @end --spec add(#nursery{}, binary(), binary()|?TOMBSTONE, pid()) -> {ok, #nursery{}}. -add(Nursery=#nursery{ log_file=File, cache=Cache, total_size=TotalSize, count=Count, config=Config }, Key, Value, Top) -> - ExpiryTime = hanoidb_util:expiry_time(Config), - TStamp = hanoidb_util:tstamp(), +-spec do_add(#nursery{}, binary(), binary()|?TOMBSTONE, pos_integer() | infinity, pid()) -> {ok, #nursery{}}. +do_add(Nursery, Key, Value, infinity, Top) -> + do_add(Nursery, Key, Value, 0, Top); +do_add(Nursery=#nursery{log_file=File, cache=Cache, total_size=TotalSize, count=Count, config=Config}, Key, Value, KeyExpiryTime, Top) -> + DatabaseExpiryTime = hanoidb:get_opt(expiry_secs, Config), + + {Data, NewValue} = case KeyExpiryTime + DatabaseExpiryTime of + 0 -> + % Both the database expiry and this key's expiry are unset or set to 0 + % (aka infinity) so never automatically expire the value. + { hanoidb_util:crc_encapsulate_kv_entry(Key, Value), Value }; + _ -> + Expiry = case DatabaseExpiryTime == 0 of + true -> + % It was the database's setting that was 0 so expire this + % value after KeyExpiryTime seconds elapse. + hanoidb_util:expiry_time(KeyExpiryTime); + false -> + case KeyExpiryTime == 0 of + true -> hanoidb_util:expiry_time(DatabaseExpiryTime); + false -> hanoidb_util:expiry_time(min(KeyExpiryTime, DatabaseExpiryTime)) + end + end, + { hanoidb_util:crc_encapsulate_kv_entry(Key, {Value, Expiry}), {Value, Expiry} } + end, - if ExpiryTime == 0 -> - Data = hanoidb_util:crc_encapsulate_kv_entry( Key, Value ); - true -> - Data = hanoidb_util:crc_encapsulate_kv_entry( Key, {Value, TStamp} ) - end, ok = file:write(File, Data), - Nursery1 = do_sync(File, Nursery), - - if ExpiryTime == 0 -> - Cache2 = gb_trees:enter(Key, Value, Cache); - true -> - Cache2 = gb_trees:enter(Key, {Value, TStamp}, Cache) - end, + Cache2 = gb_trees:enter(Key, NewValue, Cache), {ok, Nursery2} = do_inc_merge(Nursery1#nursery{ cache=Cache2, total_size=TotalSize+erlang:iolist_size(Data), count=Count+1 }, 1, Top), - if - Count+1 >= ?BTREE_SIZE(?TOP_LEVEL) -> - {full, Nursery2}; - true -> - {ok, Nursery2} - end. + nursery_full(Nursery2). do_sync(File, Nursery) -> case application:get_env(hanoidb, sync_strategy) of @@ -147,13 +156,13 @@ do_sync(File, Nursery) -> Nursery#nursery{ last_sync = LastSync }. -lookup(Key, #nursery{ cache=Cache, config=Config }) -> +lookup(Key, #nursery{cache=Cache}) -> case gb_trees:lookup(Key, Cache) of {value, {Value, TStamp}} -> - ExpiryTime = hanoidb_util:expiry_time(Config), - if TStamp < ExpiryTime -> + case hanoidb_util:has_expired(TStamp) of + true -> none; - true -> + false -> {value, Value} end; Reply -> @@ -231,8 +240,11 @@ destroy(#nursery{ dir=Dir, log_file=LogFile }) -> ok. -add_maybe_flush(Key, Value, Nursery, Top) -> - case add(Nursery, Key, Value, Top) of +add(Key, Value, Nursery, Top) -> + add(Key, Value, infinity, Nursery, Top). + +add(Key, Value, Expiry, Nursery, Top) -> + case do_add(Nursery, Key, Value, Expiry, Top) of {ok, _Nursery} = OK -> OK; {full, Nursery2} -> @@ -285,14 +297,13 @@ do_inc_merge(Nursery=#nursery{ step=Step, merge_done=Done }, N, TopLevel) -> {ok, Nursery#nursery{ step=Step+N }} end. -do_level_fold(#nursery{ cache=Cache, config=Config }, FoldWorkerPID, KeyRange) -> +do_level_fold(#nursery{cache=Cache}, FoldWorkerPID, KeyRange) -> Ref = erlang:make_ref(), FoldWorkerPID ! {prefix, [Ref]}, - ExpiryTime = hanoidb_util:expiry_time(Config), case lists:foldl(fun(_,{LastKey,limit}) -> {LastKey,limit}; ({Key,Value}, {LastKey,Count}) -> - IsExpired = is_expired(Value, ExpiryTime), + IsExpired = is_expired(Value), case ?KEY_IN_RANGE(Key,KeyRange) andalso (not IsExpired) of true -> @@ -330,11 +341,12 @@ decrement(Number) -> %%% -is_expired({_Value, TStamp}, ExpiryTime) -> - TStamp < ExpiryTime ; -is_expired(?TOMBSTONE,_) -> +% TODO this is duplicate code also found in hanoidb_reader +is_expired(?TOMBSTONE) -> false; -is_expired(Bin,_) when is_binary(Bin) -> +is_expired({_Value, TStamp}) -> + hanoidb_util:has_expired(TStamp); +is_expired(Bin) when is_binary(Bin) -> false. get_value({Value, TStamp}) when is_integer(TStamp) -> diff --git a/src/hanoidb_reader.erl b/src/hanoidb_reader.erl index d52aa52..cd902ff 100644 --- a/src/hanoidb_reader.erl +++ b/src/hanoidb_reader.erl @@ -117,15 +117,14 @@ fold1(File,Fun,Acc0) -> fold0(File,Fun,Node,Acc0) end. -range_fold(Fun, Acc0, #index{file=File,root=Root,config=Options}, Range) -> - ExpiryTime = hanoidb_util:expiry_time(Options), +range_fold(Fun, Acc0, #index{file=File,root=Root}, Range) -> case lookup_node(File,Range#key_range.from_key,Root,?FIRST_BLOCK_POS) of {ok, {Pos,_}} -> file:position(File, Pos), - do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit, ExpiryTime); + do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit); {ok, Pos} -> file:position(File, Pos), - do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit, ExpiryTime); + do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit); none -> {done, Acc0} end. @@ -140,11 +139,12 @@ fold_until_stop2(_Fun,{continue, Acc},[]) -> fold_until_stop2(Fun,{continue, Acc},[H|T]) -> fold_until_stop2(Fun,Fun(H,Acc),T). -is_expired({_Value, TStamp}, ExpiryTime) -> - TStamp < ExpiryTime ; -is_expired(?TOMBSTONE,_) -> +% TODO this is duplicate code also found in hanoidb_nursery +is_expired(?TOMBSTONE) -> false; -is_expired(Bin,_) when is_binary(Bin) -> +is_expired({_Value, TStamp}) -> + hanoidb_util:has_expired(TStamp); +is_expired(Bin) when is_binary(Bin) -> false. get_value({Value, _TStamp}) -> @@ -152,14 +152,7 @@ get_value({Value, _TStamp}) -> get_value(Value) -> Value. -get_tstamp({_Value, TStamp}) -> - TStamp; -get_tstamp(_) -> - hanoidb_util:tstamp(). - - - -do_range_fold(Fun, Acc0, File, Range, undefined, ExpiryTime) -> +do_range_fold(Fun, Acc0, File, Range, undefined) -> case next_leaf_node(File) of eof -> {done, Acc0}; @@ -168,7 +161,7 @@ do_range_fold(Fun, Acc0, File, Range, undefined, ExpiryTime) -> case fold_until_stop(fun({Key,_}, Acc) when not ?KEY_IN_TO_RANGE(Key,Range) -> {stop, {done, Acc}}; ({Key,Value}, Acc) when ?KEY_IN_FROM_RANGE(Key, Range) -> - case is_expired(Value, ExpiryTime) of + case is_expired(Value) of true -> {continue, Acc}; false -> @@ -181,11 +174,11 @@ do_range_fold(Fun, Acc0, File, Range, undefined, ExpiryTime) -> Members) of {stopped, Result} -> Result; {ok, Acc1} -> - do_range_fold(Fun, Acc1, File, Range, undefined, ExpiryTime) + do_range_fold(Fun, Acc1, File, Range, undefined) end end; -do_range_fold(Fun, Acc0, File, Range, N0, ExpiryTime) -> +do_range_fold(Fun, Acc0, File, Range, N0) -> case next_leaf_node(File) of eof -> {done, Acc0}; @@ -198,14 +191,14 @@ do_range_fold(Fun, Acc0, File, Range, N0, ExpiryTime) -> ({Key,?TOMBSTONE}, {N1,Acc}) when ?KEY_IN_FROM_RANGE(Key,Range) -> {continue, {N1, Fun(Key, ?TOMBSTONE, Acc)}}; ({Key,{?TOMBSTONE,TStamp}}, {N1,Acc}) when ?KEY_IN_FROM_RANGE(Key,Range) -> - case TStamp < ExpiryTime of + case hanoidb_utils:has_expired(TStamp) of true -> {continue, {N1,Acc}}; false -> {continue, {N1, Fun(Key, ?TOMBSTONE, Acc)}} end; ({Key,Value}, {N1,Acc}) when ?KEY_IN_FROM_RANGE(Key,Range) -> - case is_expired(Value, ExpiryTime) of + case is_expired(Value) of true -> {continue, {N1,Acc}}; false -> @@ -218,7 +211,7 @@ do_range_fold(Fun, Acc0, File, Range, N0, ExpiryTime) -> Members) of {stopped, Result} -> Result; {ok, {N2, Acc1}} -> - do_range_fold(Fun, Acc1, File, Range, N2, ExpiryTime) + do_range_fold(Fun, Acc1, File, Range, N2) end end. @@ -261,14 +254,14 @@ close(#index{file=File}) -> file:close(File). -lookup(#index{file=File, root=Node, bloom=Bloom, config=Config }, Key) -> +lookup(#index{file=File, root=Node, bloom=Bloom}, Key) -> case ebloom:contains(Bloom, Key) of true -> case lookup_in_node(File,Node,Key) of not_found -> not_found; {ok, {Value, TStamp}} ?ASSERT_WHEN(Value =:= ?TOMBSTONE; is_binary(Value)) -> - case TStamp < hanoidb_util:expiry_time(Config) of + case hanoidb_utils:has_expired(TStamp) of true -> not_found; false -> {ok, Value} end; diff --git a/src/hanoidb_util.erl b/src/hanoidb_util.erl index 5e852f3..e77a816 100644 --- a/src/hanoidb_util.erl +++ b/src/hanoidb_util.erl @@ -226,14 +226,14 @@ tstamp() -> {Mega, Sec, _Micro} = os:timestamp(), (Mega * 1000000) + Sec. -%% @doc Return time when values expire (i.e. Now - ExpirySecs), or 0. --spec expiry_time([_]) -> pos_integer(). -expiry_time(Opts) -> - ExpirySecs = hanoidb:get_opt(expiry_secs, Opts), - case ExpirySecs > 0 of - true -> tstamp() - ExpirySecs; - false -> 0 - end. +%% @doc Return time when values expire (i.e. Now + ExpirySecs), or 0. +-spec expiry_time(pos_integer()) -> pos_integer(). +expiry_time(ExpirySecs) when ExpirySecs > 0 -> + tstamp() + ExpirySecs. + +-spec has_expired(pos_integer()) -> true|false. +has_expired(Expiration) when Expiration > 0 -> + Expiration < tstamp(). ensure_expiry(Opts) -> case hanoidb:get_opt(expiry_secs, Opts) of diff --git a/src/hanoidb_writer.erl b/src/hanoidb_writer.erl index 510f315..16effea 100644 --- a/src/hanoidb_writer.erl +++ b/src/hanoidb_writer.erl @@ -57,7 +57,6 @@ block_size = ?NODE_SIZE, compress = none :: none | snappy | gzip, opts = [], - expiry_time, value_count = 0, tombstone_count = 0 @@ -103,8 +102,7 @@ init([Name,Options]) -> bloom = BloomFilter, block_size = BlockSize, compress = hanoidb:get_opt(compress, Options, none), - opts = Options, - expiry_time = hanoidb_util:expiry_time(Options) + opts = Options }}; {error, _}=Error -> error_logger:error_msg("hanoidb_writer cannot open ~p: ~p~n", [Name, Error]), @@ -113,12 +111,13 @@ init([Name,Options]) -> handle_cast({add, Key, {Data, TStamp}=Record}, State) when is_binary(Key), (is_binary(Data) orelse Data == ?TOMBSTONE)-> - if TStamp < State#state.expiry_time -> - State2 = State; - true -> - {ok, State2} = add_record(0, Key, Record, State) - end, - {noreply, State2}; + case hanoidb_util:has_expired(TStamp) of + true -> + {ok, State2} = add_record(0, Key, Record, State), + {noreply, State2}; + false -> + {noreply, State} + end; handle_cast({add, Key, Data}, State) when is_binary(Key), (is_binary(Data) orelse Data == ?TOMBSTONE)-> {ok, State2} = add_record(0, Key, Data, State), diff --git a/test/backend_eqc.erl b/test/backend_eqc.erl index f5cf3c1..6935c17 100644 --- a/test/backend_eqc.erl +++ b/test/backend_eqc.erl @@ -83,7 +83,7 @@ test(Backend, Volatile, Config, Cleanup) -> test(Backend, Volatile, Config, Cleanup, NumTests) -> triq:check(%triq:numtests(NumTests, prop_backend(Backend, Volatile, Config, Cleanup) - %% ) +% ) ). %% ====================================================================