Refactor for expiry_secs option

Tree nodes now hold entries at the form

   {Key, ?TOMBSTONE 
       | BinValue
       | {?TOMBSTONE, TStamp}
       | {BinValue, TStamp}}

We use the form without TStamp when expiry_secs
is unset or set to 0 (i.e., values don't expire).

merger/writer: Move KV count into writer, because
now the writer:add determines if a value is expired
and thus wither a value is actually written.  Thus,
writer now has a new API function which returns the
KV count written so far.

reader: lookup/fold API hides the TStamp tuples,
so only the next_node API used by the merger
is exposed to these {Key, {_, TStamp}} entries.

nursery: like reader, the TStamp'ed tuples are
not exposed in the client API; expired values
are simply not returned from fold/lookup.

hanoidb: add config option {expiry_secs, N}.

other modules: Make sure that config is passed
all the way down through (sub) processes to be
able to utilize the config option everywhere.

test: update to work with new option.
This commit is contained in:
Kresten Krab Thorup 2012-05-11 12:00:32 +02:00
parent 245d815e4c
commit b6955c9a75
9 changed files with 275 additions and 98 deletions

View file

@ -43,6 +43,8 @@
-record(state, { top, nursery, dir, opt, max_level }).
%% 0 means never expire
-define(DEFAULT_EXPIRY_SECS, 0).
-ifdef(DEBUG).
-define(log(Fmt,Args),io:format(user,Fmt,Args)).
@ -61,6 +63,7 @@
| {write_buffer_size, pos_integer()}
| {merge_strategy, fast | predictable }
| {sync_strategy, none | sync | {seconds, pos_integer()}}
| {expiry_secs, non_neg_integer()}
.
% @doc
@ -241,17 +244,27 @@ drain_worker_and_return(MRef, PID, Value) ->
end.
init([Dir, Opts]) ->
init([Dir, Opts0]) ->
%% ensure expory_secs option is set in config
case get_opt(expiry_secs, Opts0) of
undefined ->
Opts = [{expiry_secs, ?DEFAULT_EXPIRY_SECS}|Opts0];
N when is_integer(N), N >= 0 ->
Opts = [{expiry_secs, N}|Opts0]
end,
hanoidb_util:ensure_expiry(Opts),
case file:read_file_info(Dir) of
{ok, #file_info{ type=directory }} ->
{ok, TopLevel, MaxLevel} = open_levels(Dir,Opts),
{ok, Nursery} = hanoidb_nursery:recover(Dir, TopLevel, MaxLevel);
{ok, Nursery} = hanoidb_nursery:recover(Dir, TopLevel, MaxLevel, Opts);
{error, E} when E =:= enoent ->
ok = file:make_dir(Dir),
{ok, TopLevel} = hanoidb_level:open(Dir, ?TOP_LEVEL, undefined, Opts, self()),
MaxLevel = ?TOP_LEVEL,
{ok, Nursery} = hanoidb_nursery:new(Dir, MaxLevel)
{ok, Nursery} = hanoidb_nursery:new(Dir, MaxLevel, Opts)
end,
{ok, #state{ top=TopLevel, dir=Dir, nursery=Nursery, opt=Opts, max_level=MaxLevel }}.
@ -412,9 +425,9 @@ do_transact(TransactionSpec, State=#state{ nursery=Nursery, top=Top }) ->
{ok, Nursery2} = hanoidb_nursery:transact(TransactionSpec, Nursery, Top),
{ok, State#state{ nursery=Nursery2 }}.
flush_nursery(State=#state{nursery=Nursery, top=Top, dir=Dir, max_level=MaxLevel}) ->
flush_nursery(State=#state{nursery=Nursery, top=Top, dir=Dir, max_level=MaxLevel, opt=Config }) ->
ok = hanoidb_nursery:finish(Nursery, Top),
{ok, Nursery2} = hanoidb_nursery:new(Dir, MaxLevel),
{ok, Nursery2} = hanoidb_nursery:new(Dir, MaxLevel, Config),
{ok, State#state{ nursery=Nursery2 }}.
start_app() ->

View file

@ -80,6 +80,7 @@ debug_log(State,Fmt,Args) ->
%%%%% PUBLIC OPERATIONS
open(Dir,Level,Next,Opts,Owner) when Level>0 ->
hanoidb_util:ensure_expiry(Opts),
PID = plain_fsm:spawn(?MODULE,
fun() ->
process_flag(trap_exit,true),
@ -148,6 +149,7 @@ code_change(_OldVsn, _State, _Extra) ->
initialize(State) ->
hanoidb_util:ensure_expiry(State#state.opts),
try
_Result = initialize2(State),

View file

@ -60,14 +60,16 @@ merge(A,B,C, Size, IsLastLevel, Options) ->
{node, AKVs} = hanoidb_reader:first_node(BT1),
{node, BKVs} = hanoidb_reader:first_node(BT2),
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, 0, {0, none}).
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, {0, none}).
terminate(Count, Out) ->
terminate(Out) ->
case ?LOCAL_WRITER of
true ->
{ok, Count, _} = hanoidb_writer:handle_call(count, self(), Out),
{stop, normal, ok, _} = hanoidb_writer:handle_call(close, self(), Out);
false ->
Count = hanoidb_writer:count(Out),
ok = hanoidb_writer:close(Out)
end,
@ -83,14 +85,14 @@ hibernate_scan(Keep) ->
erlang:garbage_collect(),
receive
{step, From, HowMany} ->
{BT1, BT2, OutBin, IsLastLevel, AKVs, BKVs, Count, N} = erlang:binary_to_term( zlib:gunzip( Keep ) ),
{BT1, BT2, OutBin, IsLastLevel, AKVs, BKVs, N} = erlang:binary_to_term( zlib:gunzip( Keep ) ),
scan(hanoidb_reader:deserialize(BT1),
hanoidb_reader:deserialize(BT2),
hanoidb_writer:deserialize(OutBin),
IsLastLevel, AKVs, BKVs, Count, {N+HowMany, From})
IsLastLevel, AKVs, BKVs, {N+HowMany, From})
end.
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {N, FromPID}) when N < 1, AKVs =/= [], BKVs =/= [] ->
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) when N < 1, AKVs =/= [], BKVs =/= [] ->
case FromPID of
none ->
ok;
@ -100,39 +102,39 @@ scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {N, FromPID}) when N < 1, AK
receive
{step, From, HowMany} ->
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {N+HowMany, From})
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, {N+HowMany, From})
after ?HIBERNATE_TIMEOUT ->
case ?LOCAL_WRITER of
true ->
Args = {hanoidb_reader:serialize(BT1),
hanoidb_reader:serialize(BT2),
hanoidb_writer:serialize(Out), IsLastLevel, AKVs, BKVs, Count, N},
hanoidb_writer:serialize(Out), IsLastLevel, AKVs, BKVs, N},
Keep = zlib:gzip ( erlang:term_to_binary( Args ) ),
hibernate_scan(Keep);
false ->
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {0, none})
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, {0, none})
end
end;
scan(BT1, BT2, Out, IsLastLevel, [], BKVs, Count, Step) ->
scan(BT1, BT2, Out, IsLastLevel, [], BKVs, Step) ->
case hanoidb_reader:next_node(BT1) of
{node, AKVs} ->
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, Step);
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Step);
end_of_data ->
hanoidb_reader:close(BT1),
scan_only(BT2, Out, IsLastLevel, BKVs, Count, Step)
scan_only(BT2, Out, IsLastLevel, BKVs, Step)
end;
scan(BT1, BT2, Out, IsLastLevel, AKVs, [], Count, Step) ->
scan(BT1, BT2, Out, IsLastLevel, AKVs, [], Step) ->
case hanoidb_reader:next_node(BT2) of
{node, BKVs} ->
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, Step);
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Step);
end_of_data ->
hanoidb_reader:close(BT2),
scan_only(BT1, Out, IsLastLevel, AKVs, Count, Step)
scan_only(BT1, Out, IsLastLevel, AKVs, Step)
end;
scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count, Step) ->
scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Step) ->
if Key1 < Key2 ->
case ?LOCAL_WRITER of
true ->
@ -140,8 +142,7 @@ scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKV
false ->
ok = hanoidb_writer:add(Out2=Out, Key1, Value1)
end,
scan(BT1, BT2, Out2, IsLastLevel, AT, BKVs, Count+1, step(Step));
scan(BT1, BT2, Out2, IsLastLevel, AT, BKVs, step(Step));
Key2 < Key1 ->
case ?LOCAL_WRITER of
@ -150,12 +151,7 @@ scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKV
false ->
ok = hanoidb_writer:add(Out2=Out, Key2, Value2)
end,
scan(BT1, BT2, Out2, IsLastLevel, AKVs, BT, Count+1, step(Step));
%% cases below have Key1 == Key2, hence it consumes 2 elements
(?TOMBSTONE =:= Value2) and (true =:= IsLastLevel) ->
scan(BT1, BT2, Out, IsLastLevel, AT, BT, Count, step(Step, 2));
scan(BT1, BT2, Out2, IsLastLevel, AKVs, BT, step(Step));
true ->
case ?LOCAL_WRITER of
@ -164,7 +160,7 @@ scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKV
false ->
ok = hanoidb_writer:add(Out2=Out, Key2, Value2)
end,
scan(BT1, BT2, Out2, IsLastLevel, AT, BT, Count+1, step(Step, 2))
scan(BT1, BT2, Out2, IsLastLevel, AT, BT, step(Step, 2))
end.
@ -172,14 +168,14 @@ hibernate_scan_only(Keep) ->
erlang:garbage_collect(),
receive
{step, From, HowMany} ->
{BT, OutBin, IsLastLevel, KVs, Count, N} = erlang:binary_to_term( zlib:gunzip( Keep ) ),
{BT, OutBin, IsLastLevel, KVs, N} = erlang:binary_to_term( zlib:gunzip( Keep ) ),
scan_only(hanoidb_reader:deserialize(BT),
hanoidb_writer:deserialize(OutBin),
IsLastLevel, KVs, Count, {N+HowMany, From})
IsLastLevel, KVs, {N+HowMany, From})
end.
scan_only(BT, Out, IsLastLevel, KVs, Count, {N, FromPID}) when N < 1, KVs =/= [] ->
scan_only(BT, Out, IsLastLevel, KVs, {N, FromPID}) when N < 1, KVs =/= [] ->
case FromPID of
none ->
ok;
@ -189,18 +185,18 @@ scan_only(BT, Out, IsLastLevel, KVs, Count, {N, FromPID}) when N < 1, KVs =/= []
receive
{step, From, HowMany} ->
scan_only(BT, Out, IsLastLevel, KVs, Count, {N+HowMany, From})
scan_only(BT, Out, IsLastLevel, KVs, {N+HowMany, From})
after ?HIBERNATE_TIMEOUT ->
Args = {hanoidb_reader:serialize(BT),
hanoidb_writer:serialize(Out), IsLastLevel, KVs, Count, N},
hanoidb_writer:serialize(Out), IsLastLevel, KVs, N},
Keep = zlib:gzip ( erlang:term_to_binary( Args ) ),
hibernate_scan_only(Keep)
end;
scan_only(BT, Out, IsLastLevel, [], Count, {_, FromPID}=Step) ->
scan_only(BT, Out, IsLastLevel, [], {_, FromPID}=Step) ->
case hanoidb_reader:next_node(BT) of
{node, KVs} ->
scan_only(BT, Out, IsLastLevel, KVs, Count, Step);
scan_only(BT, Out, IsLastLevel, KVs, Step);
end_of_data ->
case FromPID of
none ->
@ -209,17 +205,17 @@ scan_only(BT, Out, IsLastLevel, [], Count, {_, FromPID}=Step) ->
PID ! {Ref, step_done}
end,
hanoidb_reader:close(BT),
terminate(Count, Out)
terminate(Out)
end;
scan_only(BT, Out, true, [{_,?TOMBSTONE}|Rest], Count, Step) ->
scan_only(BT, Out, true, Rest, Count, step(Step));
scan_only(BT, Out, true, [{_,?TOMBSTONE}|Rest], Step) ->
scan_only(BT, Out, true, Rest, step(Step));
scan_only(BT, Out, IsLastLevel, [{Key,Value}|Rest], Count, Step) ->
scan_only(BT, Out, IsLastLevel, [{Key,Value}|Rest], Step) ->
case ?LOCAL_WRITER of
true ->
{noreply, Out2} = hanoidb_writer:handle_cast({add, Key, Value}, Out);
false ->
ok = hanoidb_writer:add(Out2=Out, Key, Value)
end,
scan_only(BT, Out2, IsLastLevel, Rest, Count+1, step(Step)).
scan_only(BT, Out2, IsLastLevel, Rest, step(Step)).

View file

@ -25,7 +25,7 @@
-module(hanoidb_nursery).
-author('Kresten Krab Thorup <krab@trifork.com>').
-export([new/2, recover/3, add/3, finish/2, lookup/2, add_maybe_flush/4]).
-export([new/3, recover/4, add/3, finish/2, lookup/2, add_maybe_flush/4]).
-export([do_level_fold/3, set_max_level/2, transact/3, destroy/1]).
-include("include/hanoidb.hrl").
@ -33,32 +33,36 @@
-include_lib("kernel/include/file.hrl").
-record(nursery, { log_file, dir, cache, total_size=0, count=0,
last_sync=now(), max_level }).
last_sync=now(), max_level, config=[] }).
-spec new(string(), integer()) -> {ok, #nursery{}} | {error, term()}.
-spec new(string(), integer(), [_]) -> {ok, #nursery{}} | {error, term()}.
-define(LOGFILENAME(Dir), filename:join(Dir, "nursery.log")).
new(Directory, MaxLevel) ->
new(Directory, MaxLevel, Config) ->
hanoidb_util:ensure_expiry(Config),
{ok, File} = file:open( ?LOGFILENAME(Directory),
[raw, exclusive, write, delayed_write, append]),
{ok, #nursery{ log_file=File, dir=Directory, cache= gb_trees:empty(),
max_level=MaxLevel}}.
max_level=MaxLevel, config=Config }}.
recover(Directory, TopLevel, MaxLevel) ->
recover(Directory, TopLevel, MaxLevel, Config) ->
hanoidb_util:ensure_expiry(Config),
case file:read_file_info( ?LOGFILENAME(Directory) ) of
{ok, _} ->
ok = do_recover(Directory, TopLevel, MaxLevel),
new(Directory, MaxLevel);
ok = do_recover(Directory, TopLevel, MaxLevel, Config),
new(Directory, MaxLevel, Config);
{error, enoent} ->
new(Directory, MaxLevel)
new(Directory, MaxLevel, Config)
end.
do_recover(Directory, TopLevel, MaxLevel) ->
do_recover(Directory, TopLevel, MaxLevel, Config) ->
%% repair the log file; storing it in nursery2
LogFileName = ?LOGFILENAME(Directory),
{ok, Nursery} = read_nursery_from_log(Directory, MaxLevel),
{ok, Nursery} = read_nursery_from_log(Directory, MaxLevel, Config),
ok = finish(Nursery, TopLevel),
@ -70,14 +74,19 @@ do_recover(Directory, TopLevel, MaxLevel) ->
fill_cache({Key,Value}, Cache)
when is_binary(Value); Value =:= ?TOMBSTONE ->
gb_trees:enter(Key, Value, Cache);
fill_cache({Key,{Value,_TStamp}=Entry}, Cache)
when is_binary(Value); Value =:= ?TOMBSTONE ->
gb_trees:enter(Key, Entry, Cache);
fill_cache([], Cache) ->
Cache;
fill_cache(Transaction, Cache) when is_list(Transaction) ->
lists:foldl(fun fill_cache/2, Cache, Transaction).
read_nursery_from_log(Directory, MaxLevel) ->
read_nursery_from_log(Directory, MaxLevel, Config) ->
{ok, LogBinary} = file:read_file( ?LOGFILENAME(Directory) ),
{ok, KVs} = hanoidb_util:decode_crc_data( LogBinary, [], [] ),
Cache = fill_cache(KVs, gb_trees:empty()),
{ok, #nursery{ dir=Directory, cache=Cache, count=gb_trees:size(Cache), max_level=MaxLevel }}.
{ok, #nursery{ dir=Directory, cache=Cache, count=gb_trees:size(Cache), max_level=MaxLevel, config=Config }}.
% @doc
@ -85,13 +94,13 @@ read_nursery_from_log(Directory, MaxLevel) ->
% @end
-spec add(#nursery{}, binary(), binary()|?TOMBSTONE) -> {ok, #nursery{}}.
add(Nursery=#nursery{ log_file=File, cache=Cache, total_size=TotalSize, count=Count }, Key, Value) ->
Data = hanoidb_util:crc_encapsulate_kv_entry( Key, Value ),
TStamp = hanoidb_util:tstamp(),
Data = hanoidb_util:crc_encapsulate_kv_entry( Key, {Value, TStamp} ),
ok = file:write(File, Data),
Nursery1 = do_sync(File, Nursery),
Cache2 = gb_trees:enter(Key, Value, Cache),
Cache2 = gb_trees:enter(Key, {Value, TStamp}, Cache),
Nursery2 = Nursery1#nursery{ cache=Cache2, total_size=TotalSize+erlang:iolist_size(Data), count=Count+1 },
if
Count+1 >= ?BTREE_SIZE(?TOP_LEVEL) ->
@ -120,8 +129,18 @@ do_sync(File, Nursery) ->
Nursery#nursery{ last_sync = LastSync }.
lookup(Key, #nursery{ cache=Cache }) ->
gb_trees:lookup(Key, Cache).
lookup(Key, #nursery{ cache=Cache, config=Config }) ->
case gb_trees:lookup(Key, Cache) of
{value, {Value, TStamp}} ->
ExpiryTime = hanoidb_util:expiry_time(Config),
if TStamp < ExpiryTime ->
none;
true ->
{value, Value}
end;
Reply ->
Reply
end.
% @doc
% Finish this nursery (encode it to a btree, and delete the nursery file)
@ -129,9 +148,11 @@ lookup(Key, #nursery{ cache=Cache }) ->
-spec finish(Nursery::#nursery{}, TopLevel::pid()) -> ok.
finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile,
total_size=_TotalSize, count=Count,
max_level=MaxLevel
config=Config
}, TopLevel) ->
hanoidb_util:ensure_expiry(Config),
%% first, close the log file (if it is open)
if LogFile /= undefined ->
ok = file:close(LogFile);
@ -144,7 +165,7 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile,
%% next, flush cache to a new BTree
BTreeFileName = filename:join(Dir, "nursery.data"),
{ok, BT} = hanoidb_writer:open(BTreeFileName, [{size,?BTREE_SIZE(?TOP_LEVEL)},
{compress, none}]),
{compress, none} | Config ]),
try
lists:foreach( fun({Key,Value}) ->
ok = hanoidb_writer:add(BT, Key, Value)
@ -164,9 +185,7 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile,
%% issue some work if this is a top-level inject (blocks until previous such
%% incremental merge is finished).
hanoidb_level:begin_incremental_merge(TopLevel),
ok;
hanoidb_level:begin_incremental_merge(TopLevel);
_ ->
ok
@ -198,10 +217,10 @@ add_maybe_flush(Key, Value, Nursery, Top) ->
flush(Nursery2, Top)
end.
flush(Nursery=#nursery{ dir=Dir, max_level=MaxLevel }, Top) ->
flush(Nursery=#nursery{ dir=Dir, max_level=MaxLevel, config=Config }, Top) ->
ok = finish(Nursery, Top),
{error, enoent} = file:read_file_info( filename:join(Dir, "nursery.log")),
hanoidb_nursery:new(Dir, MaxLevel).
hanoidb_nursery:new(Dir, MaxLevel, Config).
has_room(#nursery{ count=Count }, N) ->
(Count+N) < ?BTREE_SIZE(?TOP_LEVEL).
@ -217,15 +236,16 @@ ensure_space(Nursery, NeededRoom, Top) ->
transact(Spec, Nursery=#nursery{ log_file=File, cache=Cache0, total_size=TotalSize }, Top) ->
Nursery1 = ensure_space(Nursery, length(Spec), Top),
Data = hanoidb_util:crc_encapsulate_transaction( Spec ),
TStamp = hanoidb_util:tstamp(),
Data = hanoidb_util:crc_encapsulate_transaction( Spec, TStamp ),
ok = file:write(File, Data),
Nursery2 = do_sync(File, Nursery1),
Cache2 = lists:foldl(fun({put, Key, Value}, Cache) ->
gb_trees:enter(Key, Value, Cache);
gb_trees:enter(Key, {Value, TStamp}, Cache);
({delete, Key}, Cache) ->
gb_trees:enter(Key, ?TOMBSTONE, Cache)
gb_trees:enter(Key, {?TOMBSTONE, TStamp}, Cache)
end,
Cache0,
Spec),
@ -235,16 +255,20 @@ transact(Spec, Nursery=#nursery{ log_file=File, cache=Cache0, total_size=TotalSi
{ok, Nursery2#nursery{ cache=Cache2, total_size=TotalSize+byte_size(Data), count=Count }}.
do_level_fold(#nursery{ cache=Cache }, FoldWorkerPID, KeyRange) ->
do_level_fold(#nursery{ cache=Cache, config=Config }, FoldWorkerPID, KeyRange) ->
Ref = erlang:make_ref(),
FoldWorkerPID ! {prefix, [Ref]},
ExpiryTime = hanoidb_util:expiry_time(Config),
case lists:foldl(fun(_,{LastKey,limit}) ->
{LastKey,limit};
({Key,Value}, {LastKey,Count}) ->
case ?KEY_IN_RANGE(Key,KeyRange) of
IsExpired = is_expired(Value, ExpiryTime),
case ?KEY_IN_RANGE(Key,KeyRange) andalso (not IsExpired) of
true ->
FoldWorkerPID ! {level_result, Ref, Key, Value},
case Value of
BinOrTombstone = get_value(Value),
FoldWorkerPID ! {level_result, Ref, Key, BinOrTombstone},
case BinOrTombstone of
?TOMBSTONE ->
{Key, Count};
_ ->
@ -273,3 +297,18 @@ decrement(1) ->
limit;
decrement(Number) ->
Number-1.
%%%
is_expired({_Value, TStamp}, ExpiryTime) ->
TStamp < ExpiryTime ;
is_expired(?TOMBSTONE,_) ->
false;
is_expired(Bin,_) when is_binary(Bin) ->
false.
get_value({Value, TStamp}) when is_integer(TStamp) ->
Value;
get_value(Value) when Value =:= ?TOMBSTONE; is_binary(Value) ->
Value.

View file

@ -30,6 +30,8 @@
-include("hanoidb.hrl").
-include("include/plain_rpc.hrl").
-define(ASSERT_WHEN(X), when X).
-export([open/1, open/2,close/1,lookup/2,fold/3,range_fold/4, destroy/1]).
-export([first_node/1,next_node/1]).
-export([serialize/1, deserialize/1]).
@ -115,14 +117,15 @@ fold1(File,Fun,Acc0) ->
fold0(File,Fun,Node,Acc0)
end.
range_fold(Fun, Acc0, #index{file=File,root=Root}, Range) ->
range_fold(Fun, Acc0, #index{file=File,root=Root,config=Options}, Range) ->
ExpiryTime = hanoidb_util:expiry_time(Options),
case lookup_node(File,Range#key_range.from_key,Root,?FIRST_BLOCK_POS) of
{ok, {Pos,_}} ->
file:position(File, Pos),
do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit);
do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit, ExpiryTime);
{ok, Pos} ->
file:position(File, Pos),
do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit);
do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit, ExpiryTime);
none ->
{done, Acc0}
end.
@ -137,7 +140,26 @@ fold_until_stop2(_Fun,{continue, Acc},[]) ->
fold_until_stop2(Fun,{continue, Acc},[H|T]) ->
fold_until_stop2(Fun,Fun(H,Acc),T).
do_range_fold(Fun, Acc0, File, Range, undefined) ->
is_expired({_Value, TStamp}, ExpiryTime) ->
TStamp < ExpiryTime ;
is_expired(?TOMBSTONE,_) ->
false;
is_expired(Bin,_) when is_binary(Bin) ->
false.
get_value({Value, _TStamp}) ->
Value;
get_value(Value) ->
Value.
get_tstamp({_Value, TStamp}) ->
TStamp;
get_tstamp(_) ->
hanoidb_util:tstamp().
do_range_fold(Fun, Acc0, File, Range, undefined, ExpiryTime) ->
case next_leaf_node(File) of
eof ->
{done, Acc0};
@ -146,7 +168,12 @@ do_range_fold(Fun, Acc0, File, Range, undefined) ->
case fold_until_stop(fun({Key,_}, Acc) when not ?KEY_IN_TO_RANGE(Key,Range) ->
{stop, {done, Acc}};
({Key,Value}, Acc) when ?KEY_IN_FROM_RANGE(Key, Range) ->
{continue, Fun(Key, Value, Acc)};
case is_expired(Value, ExpiryTime) of
true ->
{continue, Acc};
false ->
{continue, Fun(Key, get_value(Value), Acc)}
end;
(_, Acc) ->
{continue, Acc}
end,
@ -154,11 +181,11 @@ do_range_fold(Fun, Acc0, File, Range, undefined) ->
Members) of
{stopped, Result} -> Result;
{ok, Acc1} ->
do_range_fold(Fun, Acc1, File, Range, undefined)
do_range_fold(Fun, Acc1, File, Range, undefined, ExpiryTime)
end
end;
do_range_fold(Fun, Acc0, File, Range, N0) ->
do_range_fold(Fun, Acc0, File, Range, N0, ExpiryTime) ->
case next_leaf_node(File) of
eof ->
{done, Acc0};
@ -170,8 +197,20 @@ do_range_fold(Fun, Acc0, File, Range, N0) ->
{stop, {done, Acc}};
({Key,?TOMBSTONE}, {N1,Acc}) when ?KEY_IN_FROM_RANGE(Key,Range) ->
{continue, {N1, Fun(Key, ?TOMBSTONE, Acc)}};
({Key,{?TOMBSTONE,TStamp}}, {N1,Acc}) when ?KEY_IN_FROM_RANGE(Key,Range) ->
case TStamp < ExpiryTime of
true ->
{continue, {N1,Acc}};
false ->
{continue, {N1, Fun(Key, ?TOMBSTONE, Acc)}}
end;
({Key,Value}, {N1,Acc}) when ?KEY_IN_FROM_RANGE(Key,Range) ->
{continue, {N1-1, Fun(Key, Value, Acc)}};
case is_expired(Value, ExpiryTime) of
true ->
{continue, {N1,Acc}};
false ->
{continue, {N1-1, Fun(Key, get_value(Value), Acc)}}
end;
(_, Acc) ->
{continue, Acc}
end,
@ -179,7 +218,7 @@ do_range_fold(Fun, Acc0, File, Range, N0) ->
Members) of
{stopped, Result} -> Result;
{ok, {N2, Acc1}} ->
do_range_fold(Fun, Acc1, File, Range, N2)
do_range_fold(Fun, Acc1, File, Range, N2, ExpiryTime)
end
end.
@ -222,10 +261,20 @@ close(#index{file=File}) ->
file:close(File).
lookup(#index{file=File, root=Node, bloom=Bloom}, Key) ->
lookup(#index{file=File, root=Node, bloom=Bloom, config=Config }, Key) ->
case ebloom:contains(Bloom, Key) of
true ->
lookup_in_node(File,Node,Key);
case lookup_in_node(File,Node,Key) of
not_found ->
not_found;
{ok, {Value, TStamp}} ?ASSERT_WHEN(Value =:= ?TOMBSTONE; is_binary(Value)) ->
case TStamp < hanoidb_util:expiry_time(Config) of
true -> not_found;
false -> {ok, Value}
end;
{ok, Value}=Reply ?ASSERT_WHEN(Value =:= ?TOMBSTONE; is_binary(Value)) ->
Reply
end;
false ->
not_found
end.

View file

@ -36,6 +36,8 @@
-define(TAG_DELETED, 16#81).
-define(TAG_POSLEN32, 16#82).
-define(TAG_TRANSACT, 16#83).
-define(TAG_KV_DATA2, 16#84).
-define(TAG_DELETED2, 16#85).
-define(TAG_END, 16#FF).
-compile({inline, [
@ -122,20 +124,24 @@ decode_index_node(Level, <<Tag, Data/binary>>) ->
{ok, {node, Level, KVList}}.
crc_encapsulate_kv_entry(Key, {?TOMBSTONE, TStamp}) -> %
crc_encapsulate( [?TAG_DELETED2, <<TStamp:32>> | Key] );
crc_encapsulate_kv_entry(Key, ?TOMBSTONE) ->
crc_encapsulate( [?TAG_DELETED | Key] );
crc_encapsulate_kv_entry(Key, {Value, TStamp}) when is_binary(Value) ->
crc_encapsulate( [?TAG_KV_DATA2, <<TStamp:32, (byte_size(Key)):32/unsigned>>, Key | Value] );
crc_encapsulate_kv_entry(Key, Value) when is_binary(Value) ->
crc_encapsulate( [?TAG_KV_DATA, <<(byte_size(Key)):32/unsigned>>, Key | Value] );
crc_encapsulate_kv_entry(Key, {Pos,Len}) when Len < 16#ffffffff ->
crc_encapsulate( [?TAG_POSLEN32, <<Pos:64/unsigned, Len:32/unsigned>>, Key ] ).
crc_encapsulate_transaction(TransactionSpec) ->
crc_encapsulate_transaction(TransactionSpec, TStamp) ->
crc_encapsulate( [?TAG_TRANSACT |
lists:map( fun({delete, Key}) ->
crc_encapsulate_kv_entry(Key, ?TOMBSTONE);
crc_encapsulate_kv_entry(Key, {?TOMBSTONE, TStamp});
({put, Key, Value}) ->
crc_encapsulate_kv_entry(Key, Value)
crc_encapsulate_kv_entry(Key, {Value, TStamp})
end,
TransactionSpec)] ).
@ -200,6 +206,12 @@ decode_kv_data(<<?TAG_KV_DATA, KLen:32/unsigned, Key:KLen/binary, Value/binary >
decode_kv_data(<<?TAG_DELETED, Key/binary>>) ->
{Key, ?TOMBSTONE};
decode_kv_data(<<?TAG_KV_DATA2, TStamp:32/unsigned, KLen:32/unsigned, Key:KLen/binary, Value/binary >>) ->
{Key, {Value, TStamp}};
decode_kv_data(<<?TAG_DELETED2, TStamp:32/unsigned, Key/binary>>) ->
{Key, {?TOMBSTONE, TStamp}};
decode_kv_data(<<?TAG_POSLEN32, Pos:64/unsigned, Len:32/unsigned, Key/binary>>) ->
{Key, {Pos,Len}};
@ -207,5 +219,31 @@ decode_kv_data(<<?TAG_TRANSACT, Rest/binary>>) ->
{ok, TX} = decode_crc_data(Rest, [], []),
TX.
%%%%%%%
%% Return number of seconds since 1970
tstamp() ->
{Mega, Sec, _Micro} = os:timestamp(),
(Mega * 1000000) + Sec.
expiry_time(Opts) ->
ExpirySecs = hanoidb:get_opt(expiry_secs, Opts),
case ExpirySecs > 0 of
true -> tstamp() - ExpirySecs;
false -> 0
end.
ensure_expiry(Opts) ->
case hanoidb:get_opt(expiry_secs, Opts) of
undefined ->
try exit(err)
catch
exit:err ->
io:format(user, "~p~n", [erlang:get_stacktrace()])
end,
exit(expiry_secs_not_set);
N when N >= 0 ->
ok
end.

View file

@ -39,7 +39,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, serialize/1, deserialize/1]).
-export([open/1, open/2, add/3,close/1]).
-export([open/1, open/2, add/3, count/1, close/1]).
-record(node, { level, members=[], size=0 }).
@ -56,23 +56,31 @@
bloom,
block_size = ?NODE_SIZE,
compress = none :: none | snappy | gzip,
opts = [],
expiry_time,
opts = []
value_count = 0,
tombstone_count = 0
}).
%%% PUBLIC API
open(Name,Options) ->
hanoidb_util:ensure_expiry(Options),
gen_server:start_link(?MODULE, [Name,Options], []).
open(Name) ->
gen_server:start_link(?MODULE, [Name,[]], []).
gen_server:start_link(?MODULE, [Name,[{expiry_secs,0}]], []).
add(Ref,Key,Data) ->
gen_server:cast(Ref, {add, Key, Data}).
%% @doc Return number of KVs added to this writer so far
count(Ref) ->
gen_server:call(Ref, count, infinity).
close(Ref) ->
gen_server:call(Ref, close, infinity).
@ -81,6 +89,7 @@ close(Ref) ->
init([Name,Options]) ->
hanoidb_util:ensure_expiry(Options),
Size = proplists:get_value(size, Options, 2048),
% io:format("got name: ~p~n", [Name]),
@ -94,7 +103,8 @@ init([Name,Options]) ->
bloom = BloomFilter,
block_size = BlockSize,
compress = hanoidb:get_opt(compress, Options, none),
opts = Options
opts = Options,
expiry_time = hanoidb_util:expiry_time(Options)
}};
{error, _}=Error ->
error_logger:error_msg("hanoidb_writer cannot open ~p: ~p~n", [Name, Error]),
@ -102,10 +112,21 @@ init([Name,Options]) ->
end.
handle_cast({add, Key, {Data, TStamp}=Record}, State) when is_binary(Key), (is_binary(Data) orelse Data == ?TOMBSTONE)->
if TStamp < State#state.expiry_time ->
State2 = State;
true ->
{ok, State2} = add_record(0, Key, Record, State)
end,
{noreply, State2};
handle_cast({add, Key, Data}, State) when is_binary(Key), (is_binary(Data) orelse Data == ?TOMBSTONE)->
{ok, State2} = add_record(0, Key, Data, State),
{noreply, State2}.
handle_call(count, _From, State = #state{ value_count=VC, tombstone_count=TC }) ->
{ok, VC+TC, State};
handle_call(close, _From, State) ->
{ok, State2} = flush_nodes(State),
{stop,normal,ok,State2}.
@ -182,7 +203,8 @@ flush_nodes(State) ->
flush_nodes(State2).
add_record(Level, Key, Value,
#state{ nodes=[ #node{level=Level, members=List, size=NodeSize}=CurrNode |RestNodes] }=State) ->
#state{ nodes=[ #node{level=Level, members=List, size=NodeSize}=CurrNode |RestNodes],
value_count=VC, tombstone_count=TC }=State) ->
%% The top-of-stack node is at the level we wish to insert at.
%% Assert that keys are increasing:
@ -201,8 +223,26 @@ add_record(Level, Key, Value,
ok = ebloom:insert( State#state.bloom, Key ),
if Level == 0 ->
case Value of
?TOMBSTONE ->
TC1 = TC+1,
VC1 = VC;
{?TOMBSTONE, _} ->
TC1 = TC+1,
VC1 = VC;
_ ->
TC1 = TC,
VC1 = VC+1
end;
true ->
TC1 = TC,
VC1 = VC
end,
NodeMembers = [{Key,Value} | List],
State2 = State#state{ nodes=[CurrNode#node{ members=NodeMembers, size=NewSize} | RestNodes] },
State2 = State#state{ nodes=[CurrNode#node{ members=NodeMembers, size=NewSize} | RestNodes],
value_count=VC1, tombstone_count=TC1 },
if
NewSize >= State#state.block_size ->
close_node(State2);

View file

@ -37,7 +37,7 @@ merge_test() ->
file:delete("test2"),
file:delete("test3"),
{ok, BT1} = hanoidb_writer:open("test1"),
{ok, BT1} = hanoidb_writer:open("test1", [{expiry_secs, 0}]),
lists:foldl(fun(N,_) ->
ok = hanoidb_writer:add(BT1, <<N:128>>, <<"data",N:128>>)
end,
@ -46,7 +46,7 @@ merge_test() ->
ok = hanoidb_writer:close(BT1),
{ok, BT2} = hanoidb_writer:open("test2"),
{ok, BT2} = hanoidb_writer:open("test2", [{expiry_secs, 0}]),
lists:foldl(fun(N,_) ->
ok = hanoidb_writer:add(BT2, <<N:128>>, <<"data",N:128>>)
end,
@ -56,7 +56,7 @@ merge_test() ->
self() ! {step, {self(), none}, 2000000000},
{Time,{ok,Count}} = timer:tc(hanoidb_merger, merge, ["test1", "test2", "test3", 10000, true, []]),
{Time,{ok,Count}} = timer:tc(hanoidb_merger, merge, ["test1", "test2", "test3", 10000, true, [{expiry_secs, 0}]]),
error_logger:info_msg("time to merge: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]),

View file

@ -51,7 +51,7 @@ simple_test() ->
simple1_test() ->
file:delete("testdata"),
{ok, BT} = hanoidb_writer:open("testdata", [{block_size, 1024}]),
{ok, BT} = hanoidb_writer:open("testdata", [{block_size, 1024},{expiry_secs, 0}]),
Max = 1024,
Seq = lists:seq(0, Max),
@ -69,7 +69,7 @@ simple1_test() ->
error_logger:info_msg("time to insert: ~p/sec~n", [1000000/(Time1/Max)]),
{ok, IN} = hanoidb_reader:open("testdata"),
{ok, IN} = hanoidb_reader:open("testdata", [{expiry_secs,0}]),
Middle = Max div 2,
{ok, <<"valuevalue/", Middle:128>>} = hanoidb_reader:lookup(IN, <<Middle:128>>),