First steps towards intelligent CRC error handling

Current code base silently ignores CRC errors,
meaning that KVs that have errors will just
disappear, or may show up as a previously stored
value for the same key.
This commit is contained in:
Kresten Krab Thorup 2012-05-07 23:58:44 +02:00
parent 14dd00ad12
commit c58b627661
3 changed files with 39 additions and 25 deletions

View file

@ -75,7 +75,7 @@ fill_cache(Transaction, Cache) when is_list(Transaction) ->
read_nursery_from_log(Directory, MaxLevel) ->
{ok, LogBinary} = file:read_file( ?LOGFILENAME(Directory) ),
KVs = hanoidb_util:decode_crc_data( LogBinary, [] ),
{ok, KVs} = hanoidb_util:decode_crc_data( LogBinary, [], [] ),
Cache = fill_cache(KVs, gb_trees:empty()),
{ok, #nursery{ dir=Directory, cache=Cache, count=gb_trees:size(Cache), max_level=MaxLevel }}.

View file

@ -243,9 +243,13 @@ lookup_in_node(File,#node{members=Members},Key) ->
PID = proc_lib:spawn_link(fun() ->
receive
?CALL(From,read) ->
{ok, Node} = read_node(File, {Pos,Size}),
Result = lookup_in_node2(File, Node, Key),
plain_rpc:send_reply(From, Result)
case read_node(File, {Pos,Size}) of
{ok, Node} ->
Result = lookup_in_node2(File, Node, Key),
plain_rpc:send_reply(From, Result);
{error, _}=Error ->
plain_rpc:send_reply(From, Error)
end
end
end),
try plain_rpc:call(PID, read)
@ -271,8 +275,12 @@ lookup_in_node2(_File,#node{level=0,members=Members},Key) ->
lookup_in_node2(File,#node{members=Members},Key) ->
case find_1(Key, Members) of
{ok, {Pos,Size}} ->
{ok, Node} = read_node(File, {Pos,Size}),
lookup_in_node2(File, Node, Key);
case read_node(File, {Pos,Size}) of
{ok, Node} ->
lookup_in_node2(File, Node, Key);
{error, _}=Error ->
Error
end;
not_found ->
not_found
end.

View file

@ -118,7 +118,7 @@ decode_index_node(Level, <<Tag, Data/binary>>) ->
TermData = zlib:gunzip(Data)
end,
KVList = decode_kv_list(TermData),
{ok, KVList} = decode_kv_list(TermData),
{ok, {node, Level, KVList}}.
@ -145,45 +145,50 @@ crc_encapsulate(Blob) ->
[ << (Size):32/unsigned, CRC:32/unsigned >>, Blob, ?TAG_END ].
decode_kv_list(<<?ERLANG_ENCODED, _/binary>>=TermData) ->
erlang:term_to_binary(TermData);
{ok, erlang:term_to_binary(TermData)};
decode_kv_list(<<?CRC_ENCODED, Custom/binary>>) ->
decode_crc_data(Custom, []).
decode_crc_data(Custom, [], []).
decode_crc_data(<<>>, Acc) ->
lists:reverse(Acc);
decode_crc_data(<<>>, [], Acc) ->
{ok, lists:reverse(Acc)};
decode_crc_data(<< BinSize:32/unsigned, CRC:32/unsigned, Bin:BinSize/binary, ?TAG_END, Rest/binary >>, Acc) ->
decode_crc_data(<<>>, BrokenData, Acc) ->
{ok, lists:reverse(Acc)};
%%
%% TODO: here we *should* report data corruption rather than
%% simply returning "the good parts".
%%
%% {error, data_corruption};
decode_crc_data(<< BinSize:32/unsigned, CRC:32/unsigned, Bin:BinSize/binary, ?TAG_END, Rest/binary >>, Broken, Acc) ->
CRCTest = erlang:crc32( Bin ),
if CRC == CRCTest ->
decode_crc_data(Rest, [ decode_kv_data( Bin ) | Acc ]);
decode_crc_data(Rest, Broken, [ decode_kv_data( Bin ) | Acc ]);
true ->
%% chunk is broken, ignore it. Maybe we should tell someone?
decode_crc_data(Rest, Acc)
decode_crc_data(Rest, [Bin|Broken], Acc)
end;
decode_crc_data(Bad, Acc) ->
decode_crc_data(Bad, Broken, Acc) ->
%% if a chunk is broken, try to find the next ?TAG_END and
%% start decoding from there.
decode_crc_data(find_next_value(Bad), Acc).
{Skipped, MaybeGood} = find_next_value(Bad),
decode_crc_data(MaybeGood, [Skipped|Broken], Acc).
find_next_value(<<>>) ->
<<>>;
{<<>>, <<>>};
find_next_value(Bin) ->
case binary:match (Bin, <<?TAG_END>>) of
{Pos, _Len} ->
<<_SkipBin :Pos /binary, ?TAG_END, MaybeGood /binary>> = Bin,
<<SkipBin :Pos /binary, ?TAG_END, MaybeGood /binary>> = Bin,
%% TODO: tell someone? that we skipped _SkipBin. If we store
%% the data somewhere, maybe something can be recovered
%% from it ...
MaybeGood;
{SkipBin, MaybeGood};
nomatch ->
<<>>
{Bin, <<>>}
end.
decode_kv_data(<<?TAG_KV_DATA, KLen:32/unsigned, Key:KLen/binary, Value/binary >>) ->
@ -196,7 +201,8 @@ decode_kv_data(<<?TAG_POSLEN32, Pos:64/unsigned, Len:32/unsigned, Key/binary>>)
{Key, {Pos,Len}};
decode_kv_data(<<?TAG_TRANSACT, Rest/binary>>) ->
decode_crc_data(Rest, []).
{ok, TX} = decode_crc_data(Rest, [], []),
TX.