Add CRC32 data validation
If a KV entry does not validate CRC, then we simply ignore it for now. TODO: decide how to notify about broken KVs.
This commit is contained in:
parent
472ba4551e
commit
a8a66a43a0
2 changed files with 88 additions and 7 deletions
|
@ -153,7 +153,6 @@ do_range_fold(Fun, Acc0, File, Range, N0) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
lookup_node(_File,_FromKey,#node{level=0},Pos) ->
|
lookup_node(_File,_FromKey,#node{level=0},Pos) ->
|
||||||
{ok, Pos};
|
{ok, Pos};
|
||||||
lookup_node(File,FromKey,#node{members=Members,level=N},_) ->
|
lookup_node(File,FromKey,#node{members=Members,level=N},_) ->
|
||||||
|
@ -246,8 +245,7 @@ read_node(File) ->
|
||||||
0 -> eof;
|
0 -> eof;
|
||||||
_ ->
|
_ ->
|
||||||
{ok, Data} = file:read(File, Len-2),
|
{ok, Data} = file:read(File, Len-2),
|
||||||
{ok, Node} = hanoi_util:decode_index_node(Level, Data),
|
hanoi_util:decode_index_node(Level, Data)
|
||||||
{ok, Node}
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@
|
||||||
|
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include("src/hanoi.hrl").
|
||||||
|
|
||||||
index_file_name(Name) ->
|
index_file_name(Name) ->
|
||||||
Name.
|
Name.
|
||||||
|
@ -51,19 +52,21 @@ estimate_node_size_increment(_KVList,Key,Value) ->
|
||||||
|
|
||||||
encode_index_node(KVList, Compress) ->
|
encode_index_node(KVList, Compress) ->
|
||||||
|
|
||||||
TermData = erlang:term_to_binary(KVList),
|
TermData = encode_kv_list(KVList),
|
||||||
|
|
||||||
case Compress of
|
case Compress of
|
||||||
snappy ->
|
snappy ->
|
||||||
|
DataSize = erlang:iolist_size(TermData),
|
||||||
{ok, Snappied} = snappy:compress(TermData),
|
{ok, Snappied} = snappy:compress(TermData),
|
||||||
if byte_size(Snappied) > byte_size(TermData) ->
|
if byte_size(Snappied) > DataSize ->
|
||||||
OutData = [?NO_COMPRESSION|TermData];
|
OutData = [?NO_COMPRESSION|TermData];
|
||||||
true ->
|
true ->
|
||||||
OutData = [?SNAPPY_COMPRESSION|Snappied]
|
OutData = [?SNAPPY_COMPRESSION|Snappied]
|
||||||
end;
|
end;
|
||||||
gzip ->
|
gzip ->
|
||||||
|
DataSize = erlang:iolist_size(TermData),
|
||||||
GZipData = zlib:gzip(TermData),
|
GZipData = zlib:gzip(TermData),
|
||||||
if byte_size(GZipData) > byte_size(TermData) ->
|
if byte_size(GZipData) > DataSize ->
|
||||||
OutData = [?NO_COMPRESSION|TermData];
|
OutData = [?NO_COMPRESSION|TermData];
|
||||||
true ->
|
true ->
|
||||||
OutData = [?GZIP_COMPRESSION|GZipData]
|
OutData = [?GZIP_COMPRESSION|GZipData]
|
||||||
|
@ -85,7 +88,7 @@ decode_index_node(Level, <<Tag, Data/binary>>) ->
|
||||||
TermData = zlib:gunzip(Data)
|
TermData = zlib:gunzip(Data)
|
||||||
end,
|
end,
|
||||||
|
|
||||||
KVList = erlang:binary_to_term(TermData),
|
KVList = decode_kv_list(TermData),
|
||||||
{ok, {node, Level, KVList}}.
|
{ok, {node, Level, KVList}}.
|
||||||
|
|
||||||
|
|
||||||
|
@ -96,3 +99,83 @@ file_exists(FileName) ->
|
||||||
{error, enoent} ->
|
{error, enoent} ->
|
||||||
false
|
false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
-define(ERLANG_ENCODED, 131).
|
||||||
|
-define(CRC_ENCODED, 127).
|
||||||
|
|
||||||
|
-define(TAG_KV_DATA, 16#80).
|
||||||
|
-define(TAG_DELETED, 16#81).
|
||||||
|
-define(TAG_POSLEN32, 16#82).
|
||||||
|
-define(TAG_END, 16#FF).
|
||||||
|
|
||||||
|
encode(Blob) ->
|
||||||
|
CRC = erlang:crc32(Blob),
|
||||||
|
Size = erlang:iolist_size(Blob),
|
||||||
|
[ << (Size):32/unsigned, CRC:32/unsigned >>, Blob, ?TAG_END ].
|
||||||
|
|
||||||
|
encode_kv_list(KVList) ->
|
||||||
|
[ ?CRC_ENCODED |
|
||||||
|
lists:foldl(fun({Key,Value}, Acc) when is_binary(Key), is_binary(Value) ->
|
||||||
|
[ encode( [?TAG_KV_DATA, <<(byte_size(Key)):32/unsigned>>, Key, Value] ) | Acc ];
|
||||||
|
({Key, ?TOMBSTONE}, Acc) ->
|
||||||
|
[ encode( [?TAG_DELETED, Key] ) | Acc ];
|
||||||
|
({Key, {Pos,Len}}, Acc) when Len < 16#ffffffff ->
|
||||||
|
[ encode( [?TAG_POSLEN32, <<Pos:64/unsigned, Len:32/unsigned>>, Key ] ) | Acc ]
|
||||||
|
end,
|
||||||
|
[],
|
||||||
|
KVList) ].
|
||||||
|
|
||||||
|
|
||||||
|
decode_kv_list(<<?ERLANG_ENCODED, _/binary>>=TermData) ->
|
||||||
|
erlang:term_to_binary(TermData);
|
||||||
|
|
||||||
|
decode_kv_list(<<?CRC_ENCODED, Custom/binary>>) ->
|
||||||
|
decode_crc_data(Custom, []).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
decode_crc_data(<<>>, Acc) ->
|
||||||
|
Acc;
|
||||||
|
|
||||||
|
decode_crc_data(<< BinSize:32/unsigned, CRC:32/unsigned, Bin:BinSize/binary, ?TAG_END, Rest/binary >>, Acc) ->
|
||||||
|
CRCTest = erlang:crc32( Bin ),
|
||||||
|
if CRC == CRCTest ->
|
||||||
|
decode_crc_data(Rest, [ decode_kv_data( Bin ) | Acc ]);
|
||||||
|
true ->
|
||||||
|
%% chunk is broken, ignore it. Maybe we should tell someone?
|
||||||
|
decode_crc_data(Rest, Acc)
|
||||||
|
end;
|
||||||
|
|
||||||
|
decode_crc_data(Bad, Acc) ->
|
||||||
|
%% if a chunk is broken, try to find the next ?TAG_END and
|
||||||
|
%% start decoding from there.
|
||||||
|
decode_crc_data(find_next_value(Bad), Acc).
|
||||||
|
|
||||||
|
find_next_value(<<>>) ->
|
||||||
|
<<>>;
|
||||||
|
|
||||||
|
find_next_value(Bin) ->
|
||||||
|
case binary:match (Bin, <<?TAG_END>>) of
|
||||||
|
{Pos, _Len} ->
|
||||||
|
<<_SkipBin :Pos /binary, ?TAG_END, MaybeGood /binary>> = Bin,
|
||||||
|
|
||||||
|
%% TODO: tell someone? that we skipped _SkipBin. If we store
|
||||||
|
%% the data somewhere, maybe something can be recovered
|
||||||
|
%% from it ...
|
||||||
|
|
||||||
|
MaybeGood;
|
||||||
|
nomatch ->
|
||||||
|
<<>>
|
||||||
|
end.
|
||||||
|
|
||||||
|
decode_kv_data(<<?TAG_KV_DATA, KLen:32/unsigned, Key:KLen/binary, Value/binary >>) ->
|
||||||
|
{Key, Value};
|
||||||
|
|
||||||
|
decode_kv_data(<<?TAG_DELETED, Key/binary>>) ->
|
||||||
|
{Key, ?TOMBSTONE};
|
||||||
|
|
||||||
|
decode_kv_data(<<?TAG_POSLEN32, Pos:64/unsigned, Len:32/unsigned, Key/binary>>) ->
|
||||||
|
{Key, {Pos,Len}}.
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue