Add hanoi:transact, and CRC checks for nursery.log

This involves some cleanup/reorg of code
in hanoi_util.  Streaming trees and nursery
now use the same cry checking code.

Future: Keep the CRC-encoded binary around, 
and reuse it when writing trees.  This will reduce
cpu costs involved in re-computing those all the
time.
This commit is contained in:
Kresten Krab Thorup 2012-04-26 17:18:49 +02:00
parent 67f1c46b7e
commit b07d16d292
3 changed files with 119 additions and 99 deletions

View file

@ -31,7 +31,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([open/1, open/2, write/2, close/1, get/2, lookup/2, delete/2, put/3,
-export([open/1, open/2, transact/2, close/1, get/2, lookup/2, delete/2, put/3,
async_fold/3, async_fold_range/4,
fold/3, fold_range/4]).
@ -43,9 +43,10 @@
-record(state, { top, nursery, dir, opt, max_level }).
%% PUBLIC API
-type hanoi() :: pid().
% @doc
% Create or open existing hanoi store. Argument `Dir' names a
% directory in which to keep the data files. By convention, we
@ -81,17 +82,21 @@ get(Ref,Key) when is_binary(Key) ->
lookup(Ref,Key) when is_binary(Key) ->
gen_server:call(Ref, {get, Key}, infinity).
-spec delete(hanoi(), binary()) ->
ok | {error, term()}.
delete(Ref,Key) when is_binary(Key) ->
gen_server:call(Ref, {delete, Key}, infinity).
-spec put(hanoi(), binary(), binary()) ->
ok | {error, term()}.
put(Ref,Key,Value) when is_binary(Key), is_binary(Value) ->
gen_server:call(Ref, {put, Key, Value}, infinity).
-type write_spec() :: {put, binary(), binary()} | {delete, binary()}.
-spec write(hanoi(), [write_spec()]) ->
-type transact_spec() :: {put, binary(), binary()} | {delete, binary()}.
-spec transact(hanoi(), [transact_spec()]) ->
ok | {error, term()}.
write(Ref, Spec) ->
gen_server:call(Ref, {write, Spec}, infinity).
transact(Ref, TransactionSpec) ->
gen_server:call(Ref, {transact, TransactionSpec}, infinity).
fold(Ref,Fun,Acc0) ->
fold_range(Ref,Fun,Acc0,#btree_range{from_key= <<>>, to_key=undefined}).
@ -319,8 +324,8 @@ handle_call({put, Key, Value}, _From, State) when is_binary(Key), is_binary(Valu
{ok, State2} = do_put(Key, Value, State),
{reply, ok, State2};
handle_call({write, Spec}, _From, State) ->
{ok, State2} = do_write(Spec, State),
handle_call({transact, TransactionSpec}, _From, State) ->
{ok, State2} = do_transact(TransactionSpec, State),
{reply, ok, State2};
handle_call({delete, Key}, _From, State) when is_binary(Key) ->
@ -353,18 +358,15 @@ do_put(Key, Value, State=#state{ nursery=Nursery, top=Top }) ->
{ok, Nursery2} = hanoi_nursery:add_maybe_flush(Key, Value, Nursery, Top),
{ok, State#state{ nursery=Nursery2 }}.
do_write(Spec, State=#state{ nursery=Nursery, top=Top }) ->
{ok, Nursery2} = hanoi_nursery:ensure_space(Nursery, length(Spec), Top),
State2 = State#state{ nursery=Nursery2 },
lists:foldl(fun({put, Key, Value}, {ok, State3}) ->
do_put(Key, Value, State3);
({delete, Key}, {ok, State3}) ->
do_put(Key, ?TOMBSTONE, State3);
(_, {error, _}=Error) ->
Error
end,
{ok, State2},
Spec).
do_transact([{put, Key, Value}], State) ->
do_put(Key, Value, State);
do_transact([{delete, Key}], State) ->
do_put(Key, ?TOMBSTONE, State);
do_transact([], _State) ->
ok;
do_transact(TransactionSpec, State=#state{ nursery=Nursery, top=Top }) ->
{ok, Nursery2} = hanoi_nursery:transact(TransactionSpec, Nursery, Top),
{ok, State#state{ nursery=Nursery2 }}.
flush_nursery(State=#state{nursery=Nursery, top=Top, dir=Dir, max_level=MaxLevel}) ->
ok = hanoi_nursery:finish(Nursery, Top),

View file

@ -26,7 +26,7 @@
-author('Kresten Krab Thorup <krab@trifork.com>').
-export([new/2, recover/3, add/3, finish/2, lookup/2, add_maybe_flush/4]).
-export([do_level_fold/3, set_max_level/2, ensure_space/3]).
-export([do_level_fold/3, set_max_level/2, transact/3]).
-include("include/hanoi.hrl").
-include("hanoi.hrl").
@ -67,41 +67,18 @@ do_recover(Directory, TopLevel, MaxLevel) ->
ok.
fill_cache({Key,Value}, Cache)
when is_binary(Value); Value =:= ?TOMBSTONE ->
gb_trees:enter(Key, Value, Cache);
fill_cache(Transaction, Cache) when is_list(Transaction) ->
lists:foldl(fun fill_cache/2, Cache, Transaction).
read_nursery_from_log(Directory, MaxLevel) ->
{ok, LogFile} = file:open( ?LOGFILENAME(Directory), [raw, read, read_ahead, binary] ),
{ok, Cache} = load_good_chunks(LogFile, gb_trees:empty()),
ok = file:close(LogFile),
{ok, LogBinary} = file:read_file( ?LOGFILENAME(Directory) ),
KVs = hanoi_util:decode_crc_data( LogBinary, [] ),
Cache = fill_cache(KVs, gb_trees:empty()),
{ok, #nursery{ dir=Directory, cache=Cache, count=gb_trees:size(Cache), max_level=MaxLevel }}.
%% Just read the log file into a cache (gb_tree).
%% If any errors happen here, then we simply ignore them and return
%% the values we got so far.
load_good_chunks(File, Cache) ->
case file:read(File, 8) of
{ok, <<Length:32, BNotLength:32>>} when BNotLength =:= bnot Length->
case file:read(File, Length) of
{ok, EncData} when byte_size(EncData) == Length ->
try
<<KeySize:32/unsigned, Key:KeySize/binary, ValBin/binary>> = EncData,
case ValBin of
<<>> -> Value=?TOMBSTONE;
<<ValueSize:32/unsigned, Value/binary>> when ValueSize==byte_size(Value) -> ok
end,
%% TODO: is this tail recursive? I don't think so
load_good_chunks(File, gb_trees:enter(Key, Value, Cache))
catch
_:_ -> {ok, Cache}
end;
eof ->
{ok, Cache};
{error, _} ->
{ok, Cache}
end;
_ ->
{ok, Cache}
end.
% @doc
% Add a Key/Value to the nursery
@ -109,15 +86,21 @@ load_good_chunks(File, Cache) ->
-spec add(#nursery{}, binary(), binary()|?TOMBSTONE) -> {ok, #nursery{}}.
add(Nursery=#nursery{ log_file=File, cache=Cache, total_size=TotalSize, count=Count }, Key, Value) ->
Size = 4 + byte_size(Key)
+ if Value=:=?TOMBSTONE -> 0;
true -> 4 + byte_size(Value) end,
Data = hanoi_util:crc_encapsulate_kv_entry( Key, Value ),
ok = file:write(File, Data),
file:write(File, [<<Size:32/unsigned>>, <<(bnot Size):32/unsigned>>,
<<(byte_size(Key)):32>>, Key]
++ if Value /= ?TOMBSTONE -> [<<(byte_size(Value)):32>>, Value];
true -> [] end),
Nursery1 = do_sync(File, Nursery),
Cache2 = gb_trees:enter(Key, Value, Cache),
Nursery2 = Nursery1#nursery{ cache=Cache2, total_size=TotalSize+erlang:iolist_size(Data), count=Count+1 },
if
Count+1 >= ?BTREE_SIZE(?TOP_LEVEL) ->
{full, Nursery2};
true ->
{ok, Nursery2}
end.
do_sync(File, Nursery) ->
case application:get_env(hanoi, sync_strategy) of
{ok, sync} ->
file:datasync(File),
@ -134,14 +117,8 @@ add(Nursery=#nursery{ log_file=File, cache=Cache, total_size=TotalSize, count=Co
LastSync = Nursery#nursery.last_sync
end,
Cache2 = gb_trees:enter(Key, Value, Cache),
Nursery2 = Nursery#nursery{ cache=Cache2, total_size=TotalSize+Size+16, count=Count+1, last_sync=LastSync },
if
Count+1 >= ?BTREE_SIZE(?TOP_LEVEL) ->
{full, Nursery2};
true ->
{ok, Nursery2}
end.
Nursery#nursery{ last_sync = LastSync }.
lookup(Key, #nursery{ cache=Cache }) ->
gb_trees:lookup(Key, Cache).
@ -225,6 +202,26 @@ ensure_space(Nursery, NeededRoom, Top) ->
flush(Nursery, Top)
end.
transact(Spec, Nursery=#nursery{ log_file=File, cache=Cache0, total_size=TotalSize }, Top) ->
Nursery1 = ensure_space(Nursery, length(Spec), Top),
Data = hanoi_util:crc_encapsulate_transaction( Spec ),
ok = file:write(File, Data),
Nursery2 = do_sync(File, Nursery1),
Cache2 = lists:foldl(fun({put, Key, Value}, Cache) ->
gb_trees:enter(Key, Value, Cache);
({delete, Key}, Cache) ->
gb_trees:enter(Key, ?TOMBSTONE, Cache)
end,
Cache0,
Spec),
Count = gb_trees:size(Cache2),
{ok, Nursery2#nursery{ cache=Cache2, total_size=TotalSize+byte_size(Data), count=Count }}.
do_level_fold(#nursery{ cache=Cache }, FoldWorkerPID, KeyRange) ->
Ref = erlang:make_ref(),

View file

@ -29,9 +29,34 @@
-include("src/hanoi.hrl").
-define(ERLANG_ENCODED, 131).
-define(CRC_ENCODED, 127).
-define(TAG_KV_DATA, 16#80).
-define(TAG_DELETED, 16#81).
-define(TAG_POSLEN32, 16#82).
-define(TAG_TRANSACT, 16#83).
-define(TAG_END, 16#FF).
-compile({inline, [
crc_encapsulate/1, crc_encapsulate_kv_entry/2
]}).
index_file_name(Name) ->
Name.
file_exists(FileName) ->
case file:read_file_info(FileName) of
{ok, _} ->
true;
{error, enoent} ->
false
end.
estimate_node_size_increment(_KVList,Key,Value) ->
byte_size(Key)
+ 10
@ -52,7 +77,11 @@ estimate_node_size_increment(_KVList,Key,Value) ->
encode_index_node(KVList, Compress) ->
TermData = encode_kv_list(KVList),
TermData = [ ?CRC_ENCODED |
lists:map(fun ({Key,Value}) ->
crc_encapsulate_kv_entry(Key, Value)
end,
KVList) ],
case Compress of
snappy ->
@ -77,6 +106,7 @@ encode_index_node(KVList, Compress) ->
{ok, OutData}.
decode_index_node(Level, <<Tag, Data/binary>>) ->
case Tag of
@ -92,41 +122,28 @@ decode_index_node(Level, <<Tag, Data/binary>>) ->
{ok, {node, Level, KVList}}.
file_exists(FileName) ->
case file:read_file_info(FileName) of
{ok, _} ->
true;
{error, enoent} ->
false
end.
crc_encapsulate_kv_entry(Key, ?TOMBSTONE) ->
crc_encapsulate( [?TAG_DELETED | Key] );
crc_encapsulate_kv_entry(Key, Value) when is_binary(Value) ->
crc_encapsulate( [?TAG_KV_DATA, <<(byte_size(Key)):32/unsigned>>, Key | Value] );
crc_encapsulate_kv_entry(Key, {Pos,Len}) when Len < 16#ffffffff ->
crc_encapsulate( [?TAG_POSLEN32, <<Pos:64/unsigned, Len:32/unsigned>>, Key ] ).
-define(ERLANG_ENCODED, 131).
-define(CRC_ENCODED, 127).
crc_encapsulate_transaction(TransactionSpec) ->
crc_encapsulate( [?TAG_TRANSACT |
lists:map( fun({delete, Key}) ->
crc_encapsulate_kv_entry(Key, ?TOMBSTONE);
({put, Key, Value}) ->
crc_encapsulate_kv_entry(Key, Value)
end,
TransactionSpec)] ).
-define(TAG_KV_DATA, 16#80).
-define(TAG_DELETED, 16#81).
-define(TAG_POSLEN32, 16#82).
-define(TAG_END, 16#FF).
encode(Blob) ->
crc_encapsulate(Blob) ->
CRC = erlang:crc32(Blob),
Size = erlang:iolist_size(Blob),
[ << (Size):32/unsigned, CRC:32/unsigned >>, Blob, ?TAG_END ].
encode_kv_list(KVList) ->
[ ?CRC_ENCODED |
lists:foldl(fun({Key,Value}, Acc) when is_binary(Key), is_binary(Value) ->
[ encode( [?TAG_KV_DATA, <<(byte_size(Key)):32/unsigned>>, Key, Value] ) | Acc ];
({Key, ?TOMBSTONE}, Acc) ->
[ encode( [?TAG_DELETED, Key] ) | Acc ];
({Key, {Pos,Len}}, Acc) when Len < 16#ffffffff ->
[ encode( [?TAG_POSLEN32, <<Pos:64/unsigned, Len:32/unsigned>>, Key ] ) | Acc ]
end,
[],
KVList) ].
decode_kv_list(<<?ERLANG_ENCODED, _/binary>>=TermData) ->
erlang:term_to_binary(TermData);
@ -136,7 +153,7 @@ decode_kv_list(<<?CRC_ENCODED, Custom/binary>>) ->
decode_crc_data(<<>>, Acc) ->
Acc;
lists:reverse(Acc);
decode_crc_data(<< BinSize:32/unsigned, CRC:32/unsigned, Bin:BinSize/binary, ?TAG_END, Rest/binary >>, Acc) ->
CRCTest = erlang:crc32( Bin ),
@ -176,6 +193,10 @@ decode_kv_data(<<?TAG_DELETED, Key/binary>>) ->
{Key, ?TOMBSTONE};
decode_kv_data(<<?TAG_POSLEN32, Pos:64/unsigned, Len:32/unsigned, Key/binary>>) ->
{Key, {Pos,Len}}.
{Key, {Pos,Len}};
decode_kv_data(<<?TAG_TRANSACT, Rest/binary>>) ->
decode_crc_data(Rest, []).