Move checksum file related code to machi_csum_table #11
5 changed files with 296 additions and 181 deletions
|
@ -95,7 +95,7 @@ verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) ->
|
||||||
try
|
try
|
||||||
case ?FLU_C:checksum_list(Sock1, EpochID, File) of
|
case ?FLU_C:checksum_list(Sock1, EpochID, File) of
|
||||||
{ok, InfoBin} ->
|
{ok, InfoBin} ->
|
||||||
{Info, _} = machi_flu1:split_checksum_list_blob_decode(InfoBin),
|
{Info, _} = machi_csum_table:split_checksum_list_blob_decode(InfoBin),
|
||||||
Res = lists:foldl(verify_chunk_checksum(File, ReadChunk),
|
Res = lists:foldl(verify_chunk_checksum(File, ReadChunk),
|
||||||
[], Info),
|
[], Info),
|
||||||
{ok, Res};
|
{ok, Res};
|
||||||
|
|
220
src/machi_csum_table.erl
Normal file
220
src/machi_csum_table.erl
Normal file
|
@ -0,0 +1,220 @@
|
||||||
|
-module(machi_csum_table).
|
||||||
|
|
||||||
|
-export([open/2,
|
||||||
|
find/3, write/4, trim/3,
|
||||||
|
sync/1,
|
||||||
|
calc_unwritten_bytes/1,
|
||||||
|
close/1, delete/1]).
|
||||||
|
|
||||||
|
-export([encode_csum_file_entry/3, decode_csum_file_entry/1]).
|
||||||
|
|
||||||
|
-include("machi.hrl").
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
-export([split_checksum_list_blob_decode/1]).
|
||||||
|
-endif.
|
||||||
|
|
||||||
|
-record(machi_csum_table,
|
||||||
|
{file :: filename:filename(),
|
||||||
|
fd :: file:descriptor(),
|
||||||
|
table :: ets:tid()}).
|
||||||
|
|
||||||
|
-type table() :: #machi_csum_table{}.
|
||||||
|
-type byte_sequence() :: { Offset :: non_neg_integer(),
|
||||||
|
Size :: pos_integer()|infinity }.
|
||||||
|
|
||||||
|
-export_type([table/0]).
|
||||||
|
|
||||||
|
-spec open(filename:filename(), proplists:proplists()) ->
|
||||||
|
{ok, table()} | {error, file:posix()}.
|
||||||
|
open(CSumFilename, _Opts) ->
|
||||||
|
T = ets:new(?MODULE, [private, ordered_set]),
|
||||||
|
C0 = #machi_csum_table{
|
||||||
|
file=CSumFilename,
|
||||||
|
table=T},
|
||||||
|
case file:read_file(CSumFilename) of
|
||||||
|
%% , [read, raw, binary]) of
|
||||||
|
{ok, Bin} ->
|
||||||
|
List = case split_checksum_list_blob_decode(Bin) of
|
||||||
|
{List0, <<>>} ->
|
||||||
|
List0;
|
||||||
|
{List0, _Junk} ->
|
||||||
|
%% Partially written, needs repair TODO
|
||||||
|
%% [write(CSumFilename, List),
|
||||||
|
List0
|
||||||
|
end,
|
||||||
|
%% assuming all entries are strictly ordered by offset,
|
||||||
|
%% trim command should always come after checksum entry.
|
||||||
|
%% *if* by any chance that order cound not be kept, we
|
||||||
|
%% still can do ordering check and monotonic merge here.
|
||||||
|
ets:insert(T, List);
|
||||||
|
{error, enoent} ->
|
||||||
|
ok;
|
||||||
|
Error ->
|
||||||
|
throw(Error)
|
||||||
|
end,
|
||||||
|
{ok, Fd} = file:open(CSumFilename, [raw, binary, append]),
|
||||||
|
{ok, C0#machi_csum_table{fd=Fd}}.
|
||||||
|
|
||||||
|
-spec find(table(), machi_dt:chunk_pos(), machi_dt:chunk_size()) ->
|
||||||
|
{ok, machi_dt:chunk_csum()} | {error, trimmed|notfound}.
|
||||||
|
find(#machi_csum_table{table=T}, Offset, Size) ->
|
||||||
|
%% TODO: Check whether all bytes here are written or not
|
||||||
|
case ets:lookup(T, Offset) of
|
||||||
|
[{Offset, Size, trimmed}] -> {error, trimmed};
|
||||||
|
[{Offset, Size, Checksum}] -> {ok, Checksum};
|
||||||
|
[{Offset, _, _}] -> {error, unknown_chunk};
|
||||||
|
[] -> {error, unknown_chunk}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec write(table(), machi_dt:chunk_pos(), machi_dt:chunk_size(),
|
||||||
|
machi_dt:chunk_csum()) ->
|
||||||
|
ok | {error, used|file:posix()}.
|
||||||
|
write(#machi_csum_table{fd=Fd, table=T}, Offset, Size, CSum) ->
|
||||||
|
Binary = encode_csum_file_entry_bin(Offset, Size, CSum),
|
||||||
|
case file:write(Fd, Binary) of
|
||||||
|
ok ->
|
||||||
|
case ets:insert_new(T, {Offset, Size, CSum}) of
|
||||||
|
true ->
|
||||||
|
ok;
|
||||||
|
false ->
|
||||||
|
{error, written}
|
||||||
|
end;
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec trim(table(), machi_dt:chunk_pos(), machi_dt:chunk_size()) ->
|
||||||
|
ok | {error, file:posix()}.
|
||||||
|
trim(#machi_csum_table{fd=Fd, table=T}, Offset, Size) ->
|
||||||
|
Binary = encode_csum_file_entry_bin(Offset, Size, trimmed),
|
||||||
|
case file:write(Fd, Binary) of
|
||||||
|
ok ->
|
||||||
|
true = ets:insert(T, {Offset, Size, trimmed}),
|
||||||
|
ok;
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec sync(table()) -> ok | {error, file:posix()}.
|
||||||
|
sync(#machi_csum_table{fd=Fd}) ->
|
||||||
|
file:sync(Fd).
|
||||||
|
|
||||||
|
-spec calc_unwritten_bytes(table()) -> [byte_sequence()].
|
||||||
|
calc_unwritten_bytes(#machi_csum_table{table=T}) ->
|
||||||
|
case lists:sort(ets:tab2list(T)) of
|
||||||
|
[] ->
|
||||||
|
[{?MINIMUM_OFFSET, infinity}];
|
||||||
|
Sorted ->
|
||||||
|
{LastOffset, _, _} = hd(Sorted),
|
||||||
|
build_unwritten_bytes_list(Sorted, LastOffset, [])
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec close(table()) -> ok.
|
||||||
|
close(#machi_csum_table{table=T, fd=Fd}) ->
|
||||||
|
true = ets:delete(T),
|
||||||
|
ok = file:close(Fd).
|
||||||
|
|
||||||
|
-spec delete(table()) -> ok.
|
||||||
|
delete(#machi_csum_table{file=F} = C) ->
|
||||||
|
catch close(C),
|
||||||
|
case file:delete(F) of
|
||||||
|
ok -> ok;
|
||||||
|
{error, enoent} -> ok;
|
||||||
|
E -> E
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% @doc Encode `Offset + Size + TaggedCSum' into an `iolist()' type for
|
||||||
|
%% internal storage by the FLU.
|
||||||
|
|
||||||
|
-spec encode_csum_file_entry(
|
||||||
|
machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()) ->
|
||||||
|
iolist().
|
||||||
|
encode_csum_file_entry(Offset, Size, TaggedCSum) ->
|
||||||
|
Len = 8 + 4 + byte_size(TaggedCSum),
|
||||||
|
[<<$w, Len:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big>>,
|
||||||
|
TaggedCSum].
|
||||||
|
|
||||||
|
%% @doc Encode `Offset + Size + TaggedCSum' into an `binary()' type for
|
||||||
|
%% internal storage by the FLU.
|
||||||
|
|
||||||
|
-spec encode_csum_file_entry_bin(
|
||||||
|
machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()) ->
|
||||||
|
binary().
|
||||||
|
encode_csum_file_entry_bin(Offset, Size, trimmed) ->
|
||||||
|
<<$t, Offset:64/unsigned-big, Size:32/unsigned-big>>;
|
||||||
|
encode_csum_file_entry_bin(Offset, Size, TaggedCSum) ->
|
||||||
|
Len = 8 + 4 + byte_size(TaggedCSum),
|
||||||
|
<<$w, Len:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big,
|
||||||
|
TaggedCSum/binary>>.
|
||||||
|
|
||||||
|
%% @doc Decode a single `binary()' blob into an
|
||||||
|
%% `{Offset,Size,TaggedCSum}' tuple.
|
||||||
|
%%
|
||||||
|
%% The internal encoding (which is currently exposed to the outside world
|
||||||
|
%% via this function and related ones) is:
|
||||||
|
%%
|
||||||
|
%% <ul>
|
||||||
|
%% <li> 1 byte: record length
|
||||||
|
%% </li>
|
||||||
|
%% <li> 8 bytes (unsigned big-endian): byte offset
|
||||||
|
%% </li>
|
||||||
|
%% <li> 4 bytes (unsigned big-endian): chunk size
|
||||||
|
%% </li>
|
||||||
|
%% <li> all remaining bytes: tagged checksum (1st byte = type tag)
|
||||||
|
%% </li>
|
||||||
|
%% </ul>
|
||||||
|
%%
|
||||||
|
%% See `machi.hrl' for the tagged checksum types, e.g.,
|
||||||
|
%% `?CSUM_TAG_NONE'.
|
||||||
|
|
||||||
|
-spec decode_csum_file_entry(binary()) ->
|
||||||
|
error |
|
||||||
|
{machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()}.
|
||||||
|
decode_csum_file_entry(<<_:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big, TaggedCSum/binary>>) ->
|
||||||
|
{Offset, Size, TaggedCSum};
|
||||||
|
decode_csum_file_entry(_Else) ->
|
||||||
|
error.
|
||||||
|
|
||||||
|
%% @doc Split a `binary()' blob of `checksum_list' data into a list of
|
||||||
|
%% `{Offset,Size,TaggedCSum}' tuples.
|
||||||
|
|
||||||
|
-spec split_checksum_list_blob_decode(binary()) ->
|
||||||
|
{list({machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()}),
|
||||||
|
TrailingJunk::binary()}.
|
||||||
|
split_checksum_list_blob_decode(Bin) ->
|
||||||
|
split_checksum_list_blob_decode(Bin, []).
|
||||||
|
|
||||||
|
split_checksum_list_blob_decode(<<$w, Len:8/unsigned-big, Part:Len/binary, Rest/binary>>, Acc)->
|
||||||
|
One = <<Len:8/unsigned-big, Part/binary>>,
|
||||||
|
case decode_csum_file_entry(One) of
|
||||||
|
error ->
|
||||||
|
split_checksum_list_blob_decode(Rest, Acc);
|
||||||
|
DecOne ->
|
||||||
|
split_checksum_list_blob_decode(Rest, [DecOne|Acc])
|
||||||
|
end;
|
||||||
|
split_checksum_list_blob_decode(<<$t, Offset:64/unsigned-big, Size:32/unsigned-big, Rest/binary>>, Acc) ->
|
||||||
|
%% trimmed offset
|
||||||
|
split_checksum_list_blob_decode(Rest, [{Offset, Size, trimmed}|Acc]);
|
||||||
|
split_checksum_list_blob_decode(Rest, Acc) ->
|
||||||
|
{lists:reverse(Acc), Rest}.
|
||||||
|
|
||||||
|
-spec build_unwritten_bytes_list( CsumData :: [{ Offset :: non_neg_integer(),
|
||||||
|
Size :: pos_integer(),
|
||||||
|
Checksum :: binary() }],
|
||||||
|
LastOffset :: non_neg_integer(),
|
||||||
|
Acc :: list() ) -> [byte_sequence()].
|
||||||
|
% @private Given a <b>sorted</b> list of checksum data tuples, return a sorted
|
||||||
|
% list of unwritten byte ranges. The output list <b>always</b> has at least one
|
||||||
|
% entry: the last tuple in the list is guaranteed to be the current end of
|
||||||
|
% bytes written to a particular file with the special space moniker
|
||||||
|
% `infinity'.
|
||||||
|
build_unwritten_bytes_list([], Last, Acc) ->
|
||||||
|
NewAcc = [ {Last, infinity} | Acc ],
|
||||||
|
lists:reverse(NewAcc);
|
||||||
|
build_unwritten_bytes_list([{CurrentOffset, CurrentSize, _Csum}|Rest], LastOffset, Acc) when
|
||||||
|
CurrentOffset /= LastOffset ->
|
||||||
|
Hole = CurrentOffset - LastOffset,
|
||||||
|
build_unwritten_bytes_list(Rest, (CurrentOffset+CurrentSize), [{LastOffset, Hole}|Acc]);
|
||||||
|
build_unwritten_bytes_list([{CO, CS, _Ck}|Rest], _LastOffset, Acc) ->
|
||||||
|
build_unwritten_bytes_list(Rest, CO + CS, Acc).
|
|
@ -89,7 +89,7 @@
|
||||||
eof_position = 0 :: non_neg_integer(),
|
eof_position = 0 :: non_neg_integer(),
|
||||||
unwritten_bytes = [] :: [byte_sequence()],
|
unwritten_bytes = [] :: [byte_sequence()],
|
||||||
data_filehandle :: file:filehandle(),
|
data_filehandle :: file:filehandle(),
|
||||||
csum_filehandle :: file:filehandle(),
|
csum_table :: machi_csum_table:table(),
|
||||||
tref :: reference(), %% timer ref
|
tref :: reference(), %% timer ref
|
||||||
ticks = 0 :: non_neg_integer(), %% ticks elapsed with no new operations
|
ticks = 0 :: non_neg_integer(), %% ticks elapsed with no new operations
|
||||||
ops = 0 :: non_neg_integer(), %% sum of all ops
|
ops = 0 :: non_neg_integer(), %% sum of all ops
|
||||||
|
@ -196,10 +196,10 @@ init({Filename, DataDir}) ->
|
||||||
{_, DPath} = machi_util:make_data_filename(DataDir, Filename),
|
{_, DPath} = machi_util:make_data_filename(DataDir, Filename),
|
||||||
ok = filelib:ensure_dir(CsumFile),
|
ok = filelib:ensure_dir(CsumFile),
|
||||||
ok = filelib:ensure_dir(DPath),
|
ok = filelib:ensure_dir(DPath),
|
||||||
UnwrittenBytes = parse_csum_file(CsumFile),
|
{ok, CsumTable} = machi_csum_table:open(CsumFile, []),
|
||||||
|
UnwrittenBytes = machi_csum_table:calc_unwritten_bytes(CsumTable),
|
||||||
{Eof, infinity} = lists:last(UnwrittenBytes),
|
{Eof, infinity} = lists:last(UnwrittenBytes),
|
||||||
{ok, FHd} = file:open(DPath, [read, write, binary, raw]),
|
{ok, FHd} = file:open(DPath, [read, write, binary, raw]),
|
||||||
{ok, FHc} = file:open(CsumFile, [append, binary, raw]),
|
|
||||||
Tref = schedule_tick(),
|
Tref = schedule_tick(),
|
||||||
St = #state{
|
St = #state{
|
||||||
filename = Filename,
|
filename = Filename,
|
||||||
|
@ -207,7 +207,7 @@ init({Filename, DataDir}) ->
|
||||||
data_path = DPath,
|
data_path = DPath,
|
||||||
csum_file = CsumFile,
|
csum_file = CsumFile,
|
||||||
data_filehandle = FHd,
|
data_filehandle = FHd,
|
||||||
csum_filehandle = FHc,
|
csum_table = CsumTable,
|
||||||
tref = Tref,
|
tref = Tref,
|
||||||
unwritten_bytes = UnwrittenBytes,
|
unwritten_bytes = UnwrittenBytes,
|
||||||
eof_position = Eof},
|
eof_position = Eof},
|
||||||
|
@ -224,15 +224,15 @@ handle_call({sync, data}, _From, State = #state{ data_filehandle = FHd }) ->
|
||||||
R = file:sync(FHd),
|
R = file:sync(FHd),
|
||||||
{reply, R, State};
|
{reply, R, State};
|
||||||
|
|
||||||
handle_call({sync, csum}, _From, State = #state{ csum_filehandle = FHc }) ->
|
handle_call({sync, csum}, _From, State = #state{ csum_table = T }) ->
|
||||||
R = file:sync(FHc),
|
R = machi_csum_table:sync(T),
|
||||||
{reply, R, State};
|
{reply, R, State};
|
||||||
|
|
||||||
handle_call({sync, all}, _From, State = #state{filename = F,
|
handle_call({sync, all}, _From, State = #state{filename = F,
|
||||||
data_filehandle = FHd,
|
data_filehandle = FHd,
|
||||||
csum_filehandle = FHc
|
csum_table = T
|
||||||
}) ->
|
}) ->
|
||||||
R = file:sync(FHc),
|
R = machi_csum_table:sync(T),
|
||||||
R1 = file:sync(FHd),
|
R1 = file:sync(FHd),
|
||||||
Resp = case {R, R1} of
|
Resp = case {R, R1} of
|
||||||
{ok, ok} -> ok;
|
{ok, ok} -> ok;
|
||||||
|
@ -267,11 +267,17 @@ handle_call({read, Offset, Length}, _From,
|
||||||
handle_call({read, Offset, Length}, _From,
|
handle_call({read, Offset, Length}, _From,
|
||||||
State = #state{filename = F,
|
State = #state{filename = F,
|
||||||
data_filehandle = FH,
|
data_filehandle = FH,
|
||||||
|
csum_table = CsumTable,
|
||||||
unwritten_bytes = U,
|
unwritten_bytes = U,
|
||||||
reads = {T, Err}
|
reads = {T, Err}
|
||||||
}) ->
|
}) ->
|
||||||
|
|
||||||
Checksum = get({Offset, Length}), %% N.B. Maybe be 'undefined'!
|
Checksum = case machi_csum_table:find(CsumTable, Offset, Length) of
|
||||||
|
{ok, Checksum0} ->
|
||||||
|
Checksum0;
|
||||||
|
_ ->
|
||||||
|
undefined
|
||||||
|
end,
|
||||||
|
|
||||||
{Resp, NewErr} = case handle_read(FH, F, Checksum, Offset, Length, U) of
|
{Resp, NewErr} = case handle_read(FH, F, Checksum, Offset, Length, U) of
|
||||||
{ok, Bytes, Csum} ->
|
{ok, Bytes, Csum} ->
|
||||||
|
@ -296,7 +302,7 @@ handle_call({write, Offset, ClientMeta, Data}, _From,
|
||||||
filename = F,
|
filename = F,
|
||||||
writes = {T, Err},
|
writes = {T, Err},
|
||||||
data_filehandle = FHd,
|
data_filehandle = FHd,
|
||||||
csum_filehandle = FHc
|
csum_table = CsumTable
|
||||||
}) ->
|
}) ->
|
||||||
|
|
||||||
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE),
|
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE),
|
||||||
|
@ -309,7 +315,7 @@ handle_call({write, Offset, ClientMeta, Data}, _From,
|
||||||
[ClientCsum, Bad]),
|
[ClientCsum, Bad]),
|
||||||
{{error, bad_checksum}, Err + 1, U};
|
{{error, bad_checksum}, Err + 1, U};
|
||||||
TaggedCsum ->
|
TaggedCsum ->
|
||||||
case handle_write(FHd, FHc, F, TaggedCsum, Offset, Data, U) of
|
case handle_write(FHd, CsumTable, F, TaggedCsum, Offset, Data, U) of
|
||||||
{ok, NewU1} ->
|
{ok, NewU1} ->
|
||||||
{ok, Err, NewU1};
|
{ok, Err, NewU1};
|
||||||
Error ->
|
Error ->
|
||||||
|
@ -336,7 +342,7 @@ handle_call({append, ClientMeta, Extra, Data}, _From,
|
||||||
filename = F,
|
filename = F,
|
||||||
appends = {T, Err},
|
appends = {T, Err},
|
||||||
data_filehandle = FHd,
|
data_filehandle = FHd,
|
||||||
csum_filehandle = FHc
|
csum_table = CsumTable
|
||||||
}) ->
|
}) ->
|
||||||
|
|
||||||
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE),
|
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE),
|
||||||
|
@ -349,7 +355,7 @@ handle_call({append, ClientMeta, Extra, Data}, _From,
|
||||||
[ClientCsum, Bad]),
|
[ClientCsum, Bad]),
|
||||||
{{error, bad_checksum}, Err + 1, U};
|
{{error, bad_checksum}, Err + 1, U};
|
||||||
TaggedCsum ->
|
TaggedCsum ->
|
||||||
case handle_write(FHd, FHc, F, TaggedCsum, EofP, Data, U) of
|
case handle_write(FHd, CsumTable, F, TaggedCsum, EofP, Data, U) of
|
||||||
{ok, NewU1} ->
|
{ok, NewU1} ->
|
||||||
{{ok, F, EofP}, Err, NewU1};
|
{{ok, F, EofP}, Err, NewU1};
|
||||||
Error ->
|
Error ->
|
||||||
|
@ -450,7 +456,7 @@ handle_info(Req, State) ->
|
||||||
% @private
|
% @private
|
||||||
terminate(Reason, #state{filename = F,
|
terminate(Reason, #state{filename = F,
|
||||||
data_filehandle = FHd,
|
data_filehandle = FHd,
|
||||||
csum_filehandle = FHc,
|
csum_table = T,
|
||||||
reads = {RT, RE},
|
reads = {RT, RE},
|
||||||
writes = {WT, WE},
|
writes = {WT, WE},
|
||||||
appends = {AT, AE}
|
appends = {AT, AE}
|
||||||
|
@ -461,9 +467,9 @@ terminate(Reason, #state{filename = F,
|
||||||
lager:info(" Writes: ~p/~p", [WT, WE]),
|
lager:info(" Writes: ~p/~p", [WT, WE]),
|
||||||
lager:info("Appends: ~p/~p", [AT, AE]),
|
lager:info("Appends: ~p/~p", [AT, AE]),
|
||||||
ok = file:sync(FHd),
|
ok = file:sync(FHd),
|
||||||
ok = file:sync(FHc),
|
|
||||||
ok = file:close(FHd),
|
ok = file:close(FHd),
|
||||||
ok = file:close(FHc),
|
ok = machi_csum_table:sync(T),
|
||||||
|
ok = machi_csum_table:close(T),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
% @private
|
% @private
|
||||||
|
@ -496,36 +502,6 @@ check_or_make_tagged_csum(Tag, InCsum, Data) when Tag == ?CSUM_TAG_CLIENT_SHA;
|
||||||
check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
|
check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
|
||||||
lager:warning("Unknown checksum tag ~p", [OtherTag]),
|
lager:warning("Unknown checksum tag ~p", [OtherTag]),
|
||||||
{error, bad_checksum}.
|
{error, bad_checksum}.
|
||||||
|
|
||||||
encode_csum_file_entry(Offset, Size, TaggedCSum) ->
|
|
||||||
Len = 8 + 4 + byte_size(TaggedCSum),
|
|
||||||
[<<Len:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big>>,
|
|
||||||
TaggedCSum].
|
|
||||||
|
|
||||||
map_offsets_to_csums(CsumList) ->
|
|
||||||
lists:foreach(fun insert_offsets/1, CsumList).
|
|
||||||
|
|
||||||
insert_offsets({Offset, Length, Checksum}) ->
|
|
||||||
put({Offset, Length}, Checksum).
|
|
||||||
|
|
||||||
-spec parse_csum_file( Filename :: string() ) -> [byte_sequence()].
|
|
||||||
parse_csum_file(Filename) ->
|
|
||||||
%% using file:read_file works as long as the files are "small"
|
|
||||||
try
|
|
||||||
{ok, CsumData} = file:read_file(Filename),
|
|
||||||
{DecodedCsums, _Junk} = machi_flu1:split_checksum_list_blob_decode(CsumData),
|
|
||||||
Sort = lists:sort(DecodedCsums),
|
|
||||||
case Sort of
|
|
||||||
[] -> [{?MINIMUM_OFFSET, infinity}];
|
|
||||||
_ ->
|
|
||||||
map_offsets_to_csums(DecodedCsums),
|
|
||||||
{First, _, _} = hd(Sort),
|
|
||||||
build_unwritten_bytes_list(Sort, First, [])
|
|
||||||
end
|
|
||||||
catch
|
|
||||||
_:{badmatch, {error, enoent}} ->
|
|
||||||
[{?MINIMUM_OFFSET, infinity}]
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec handle_read(FHd :: file:filehandle(),
|
-spec handle_read(FHd :: file:filehandle(),
|
||||||
Filename :: string(),
|
Filename :: string(),
|
||||||
|
@ -593,7 +569,7 @@ do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec handle_write( FHd :: file:filehandle(),
|
-spec handle_write( FHd :: file:filehandle(),
|
||||||
FHc :: file:filehandle(),
|
CsumTable :: machi_csum_table:table(),
|
||||||
Filename :: string(),
|
Filename :: string(),
|
||||||
TaggedCsum :: binary(),
|
TaggedCsum :: binary(),
|
||||||
Offset :: non_neg_integer(),
|
Offset :: non_neg_integer(),
|
||||||
|
@ -613,15 +589,20 @@ do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
|
||||||
% file, and the internal list of unwritten bytes is modified to reflect the
|
% file, and the internal list of unwritten bytes is modified to reflect the
|
||||||
% just-performed write. This is then returned to the caller as
|
% just-performed write. This is then returned to the caller as
|
||||||
% `{ok, NewUnwritten}' where NewUnwritten is the revised unwritten byte list.
|
% `{ok, NewUnwritten}' where NewUnwritten is the revised unwritten byte list.
|
||||||
handle_write(FHd, FHc, Filename, TaggedCsum, Offset, Data, U) ->
|
handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data, U) ->
|
||||||
Size = iolist_size(Data),
|
Size = iolist_size(Data),
|
||||||
|
|
||||||
case is_byte_range_unwritten(Offset, Size, U) of
|
case is_byte_range_unwritten(Offset, Size, U) of
|
||||||
false ->
|
false ->
|
||||||
case get({Offset, Size}) of
|
case machi_csum_table:find(CsumTable, Offset, Size) of
|
||||||
undefined ->
|
{error, trimmed} = Error ->
|
||||||
|
Error;
|
||||||
|
{error, unknown_chunk} ->
|
||||||
|
%% The specified has some bytes written, while
|
||||||
|
%% it's not in the checksum table. Trust U and
|
||||||
|
%% return as it is used.
|
||||||
{error, written};
|
{error, written};
|
||||||
TaggedCsum ->
|
{ok, TaggedCsum} ->
|
||||||
case do_read(FHd, Filename, TaggedCsum, Offset, Size) of
|
case do_read(FHd, Filename, TaggedCsum, Offset, Size) of
|
||||||
eof ->
|
eof ->
|
||||||
lager:warning("This should never happen: got eof while reading at offset ~p in file ~p that's supposedly written",
|
lager:warning("This should never happen: got eof while reading at offset ~p in file ~p that's supposedly written",
|
||||||
|
@ -632,7 +613,7 @@ handle_write(FHd, FHc, Filename, TaggedCsum, Offset, Data, U) ->
|
||||||
_ ->
|
_ ->
|
||||||
{error, written}
|
{error, written}
|
||||||
end;
|
end;
|
||||||
OtherCsum ->
|
{ok, OtherCsum} ->
|
||||||
%% Got a checksum, but it doesn't match the data block's
|
%% Got a checksum, but it doesn't match the data block's
|
||||||
lager:error("During a potential write at offset ~p in file ~p, a check for unwritten bytes gave us checksum ~p but the data we were trying to trying to write has checksum ~p",
|
lager:error("During a potential write at offset ~p in file ~p, a check for unwritten bytes gave us checksum ~p but the data we were trying to trying to write has checksum ~p",
|
||||||
[Offset, Filename, OtherCsum, TaggedCsum]),
|
[Offset, Filename, OtherCsum, TaggedCsum]),
|
||||||
|
@ -640,7 +621,7 @@ handle_write(FHd, FHc, Filename, TaggedCsum, Offset, Data, U) ->
|
||||||
end;
|
end;
|
||||||
true ->
|
true ->
|
||||||
try
|
try
|
||||||
do_write(FHd, FHc, Filename, TaggedCsum, Offset, Size, Data, U)
|
do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data, U)
|
||||||
catch
|
catch
|
||||||
%%% XXX FIXME: be more specific on badmatch that might
|
%%% XXX FIXME: be more specific on badmatch that might
|
||||||
%%% occur around line 593 when we write the checksum
|
%%% occur around line 593 when we write the checksum
|
||||||
|
@ -653,7 +634,7 @@ handle_write(FHd, FHc, Filename, TaggedCsum, Offset, Data, U) ->
|
||||||
% @private Implements the disk writes for both the write and append
|
% @private Implements the disk writes for both the write and append
|
||||||
% operation.
|
% operation.
|
||||||
-spec do_write( FHd :: file:descriptor(),
|
-spec do_write( FHd :: file:descriptor(),
|
||||||
FHc :: file:descriptor(),
|
CsumTable :: machi_csum_table:table(),
|
||||||
Filename :: string(),
|
Filename :: string(),
|
||||||
TaggedCsum :: binary(),
|
TaggedCsum :: binary(),
|
||||||
Offset :: non_neg_integer(),
|
Offset :: non_neg_integer(),
|
||||||
|
@ -662,14 +643,12 @@ handle_write(FHd, FHc, Filename, TaggedCsum, Offset, Data, U) ->
|
||||||
Unwritten :: [byte_sequence()]
|
Unwritten :: [byte_sequence()]
|
||||||
) -> {ok, NewUnwritten :: [byte_sequence()]} |
|
) -> {ok, NewUnwritten :: [byte_sequence()]} |
|
||||||
{error, Reason :: term()}.
|
{error, Reason :: term()}.
|
||||||
do_write(FHd, FHc, Filename, TaggedCsum, Offset, Size, Data, U) ->
|
do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data, U) ->
|
||||||
case file:pwrite(FHd, Offset, Data) of
|
case file:pwrite(FHd, Offset, Data) of
|
||||||
ok ->
|
ok ->
|
||||||
lager:debug("Successful write in file ~p at offset ~p, length ~p",
|
lager:debug("Successful write in file ~p at offset ~p, length ~p",
|
||||||
[Filename, Offset, Size]),
|
[Filename, Offset, Size]),
|
||||||
EncodedCsum = encode_csum_file_entry(Offset, Size, TaggedCsum),
|
ok = machi_csum_table:write(CsumTable, Offset, Size, TaggedCsum),
|
||||||
ok = file:write(FHc, EncodedCsum),
|
|
||||||
put({Offset, Size}, TaggedCsum),
|
|
||||||
NewU = update_unwritten(Offset, Size, U),
|
NewU = update_unwritten(Offset, Size, U),
|
||||||
lager:debug("Successful write to checksum file for ~p; unwritten bytes are now: ~p",
|
lager:debug("Successful write to checksum file for ~p; unwritten bytes are now: ~p",
|
||||||
[Filename, NewU]),
|
[Filename, NewU]),
|
||||||
|
@ -756,22 +735,3 @@ update_byte_range(Offset, Size, {Pos, Space}) when Offset > Pos ->
|
||||||
[{Pos, Offset - Pos}, {Offset+Size, ( (Pos+Space) - (Offset + Size) )}].
|
[{Pos, Offset - Pos}, {Offset+Size, ( (Pos+Space) - (Offset + Size) )}].
|
||||||
|
|
||||||
|
|
||||||
-spec build_unwritten_bytes_list( CsumData :: [{ Offset :: non_neg_integer(),
|
|
||||||
Size :: pos_integer(),
|
|
||||||
Checksum :: binary() }],
|
|
||||||
LastOffset :: non_neg_integer(),
|
|
||||||
Acc :: list() ) -> [byte_sequence()].
|
|
||||||
% @private Given a <b>sorted</b> list of checksum data tuples, return a sorted
|
|
||||||
% list of unwritten byte ranges. The output list <b>always</b> has at least one
|
|
||||||
% entry: the last tuple in the list is guaranteed to be the current end of
|
|
||||||
% bytes written to a particular file with the special space moniker
|
|
||||||
% `infinity'.
|
|
||||||
build_unwritten_bytes_list([], Last, Acc) ->
|
|
||||||
NewAcc = [ {Last, infinity} | Acc ],
|
|
||||||
lists:reverse(NewAcc);
|
|
||||||
build_unwritten_bytes_list([{CurrentOffset, CurrentSize, _Csum}|Rest], LastOffset, Acc) when
|
|
||||||
CurrentOffset /= LastOffset ->
|
|
||||||
Hole = CurrentOffset - LastOffset,
|
|
||||||
build_unwritten_bytes_list(Rest, (CurrentOffset+CurrentSize), [{LastOffset, Hole}|Acc]);
|
|
||||||
build_unwritten_bytes_list([{CO, CS, _Ck}|Rest], _LastOffset, Acc) ->
|
|
||||||
build_unwritten_bytes_list(Rest, CO + CS, Acc).
|
|
||||||
|
|
|
@ -61,9 +61,6 @@
|
||||||
-export([start_link/1, stop/1,
|
-export([start_link/1, stop/1,
|
||||||
update_wedge_state/3, wedge_myself/2]).
|
update_wedge_state/3, wedge_myself/2]).
|
||||||
-export([make_listener_regname/1, make_projection_server_regname/1]).
|
-export([make_listener_regname/1, make_projection_server_regname/1]).
|
||||||
-export([encode_csum_file_entry/3, encode_csum_file_entry_bin/3,
|
|
||||||
decode_csum_file_entry/1,
|
|
||||||
split_checksum_list_blob/1, split_checksum_list_blob_decode/1]).
|
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
flu_name :: atom(),
|
flu_name :: atom(),
|
||||||
|
@ -707,99 +704,6 @@ make_listener_regname(BaseName) ->
|
||||||
make_projection_server_regname(BaseName) ->
|
make_projection_server_regname(BaseName) ->
|
||||||
list_to_atom(atom_to_list(BaseName) ++ "_pstore").
|
list_to_atom(atom_to_list(BaseName) ++ "_pstore").
|
||||||
|
|
||||||
%% @doc Encode `Offset + Size + TaggedCSum' into an `iolist()' type for
|
|
||||||
%% internal storage by the FLU.
|
|
||||||
|
|
||||||
-spec encode_csum_file_entry(
|
|
||||||
machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()) ->
|
|
||||||
iolist().
|
|
||||||
encode_csum_file_entry(Offset, Size, TaggedCSum) ->
|
|
||||||
Len = 8 + 4 + byte_size(TaggedCSum),
|
|
||||||
[<<Len:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big>>,
|
|
||||||
TaggedCSum].
|
|
||||||
|
|
||||||
%% @doc Encode `Offset + Size + TaggedCSum' into an `binary()' type for
|
|
||||||
%% internal storage by the FLU.
|
|
||||||
|
|
||||||
-spec encode_csum_file_entry_bin(
|
|
||||||
machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()) ->
|
|
||||||
binary().
|
|
||||||
encode_csum_file_entry_bin(Offset, Size, TaggedCSum) ->
|
|
||||||
Len = 8 + 4 + byte_size(TaggedCSum),
|
|
||||||
<<Len:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big,
|
|
||||||
TaggedCSum/binary>>.
|
|
||||||
|
|
||||||
%% @doc Decode a single `binary()' blob into an
|
|
||||||
%% `{Offset,Size,TaggedCSum}' tuple.
|
|
||||||
%%
|
|
||||||
%% The internal encoding (which is currently exposed to the outside world
|
|
||||||
%% via this function and related ones) is:
|
|
||||||
%%
|
|
||||||
%% <ul>
|
|
||||||
%% <li> 1 byte: record length
|
|
||||||
%% </li>
|
|
||||||
%% <li> 8 bytes (unsigned big-endian): byte offset
|
|
||||||
%% </li>
|
|
||||||
%% <li> 4 bytes (unsigned big-endian): chunk size
|
|
||||||
%% </li>
|
|
||||||
%% <li> all remaining bytes: tagged checksum (1st byte = type tag)
|
|
||||||
%% </li>
|
|
||||||
%% </ul>
|
|
||||||
%%
|
|
||||||
%% See `machi.hrl' for the tagged checksum types, e.g.,
|
|
||||||
%% `?CSUM_TAG_NONE'.
|
|
||||||
|
|
||||||
-spec decode_csum_file_entry(binary()) ->
|
|
||||||
error |
|
|
||||||
{machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()}.
|
|
||||||
decode_csum_file_entry(<<_:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big, TaggedCSum/binary>>) ->
|
|
||||||
{Offset, Size, TaggedCSum};
|
|
||||||
decode_csum_file_entry(_Else) ->
|
|
||||||
error.
|
|
||||||
|
|
||||||
%% @doc Split a `binary()' blob of `checksum_list' data into a list of
|
|
||||||
%% unparsed `binary()' blobs, one per entry.
|
|
||||||
%%
|
|
||||||
%% Decode the unparsed blobs with {@link decode_csum_file_entry/1}, if
|
|
||||||
%% desired.
|
|
||||||
%%
|
|
||||||
%% The return value `TrailingJunk' is unparseable bytes at the end of
|
|
||||||
%% the checksum list blob.
|
|
||||||
|
|
||||||
-spec split_checksum_list_blob(binary()) ->
|
|
||||||
{list(binary()), TrailingJunk::binary()}.
|
|
||||||
split_checksum_list_blob(Bin) ->
|
|
||||||
split_checksum_list_blob(Bin, []).
|
|
||||||
|
|
||||||
split_checksum_list_blob(<<Len:8/unsigned-big, Part:Len/binary, Rest/binary>>, Acc)->
|
|
||||||
case get(hack_length) of
|
|
||||||
Len -> ok;
|
|
||||||
_ -> put(hack_different, true)
|
|
||||||
end,
|
|
||||||
split_checksum_list_blob(Rest, [<<Len:8/unsigned-big, Part/binary>>|Acc]);
|
|
||||||
split_checksum_list_blob(Rest, Acc) ->
|
|
||||||
{lists:reverse(Acc), Rest}.
|
|
||||||
|
|
||||||
%% @doc Split a `binary()' blob of `checksum_list' data into a list of
|
|
||||||
%% `{Offset,Size,TaggedCSum}' tuples.
|
|
||||||
|
|
||||||
-spec split_checksum_list_blob_decode(binary()) ->
|
|
||||||
{list({machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()}),
|
|
||||||
TrailingJunk::binary()}.
|
|
||||||
split_checksum_list_blob_decode(Bin) ->
|
|
||||||
split_checksum_list_blob_decode(Bin, []).
|
|
||||||
|
|
||||||
split_checksum_list_blob_decode(<<Len:8/unsigned-big, Part:Len/binary, Rest/binary>>, Acc)->
|
|
||||||
One = <<Len:8/unsigned-big, Part/binary>>,
|
|
||||||
case decode_csum_file_entry(One) of
|
|
||||||
error ->
|
|
||||||
split_checksum_list_blob_decode(Rest, Acc);
|
|
||||||
DecOne ->
|
|
||||||
split_checksum_list_blob_decode(Rest, [DecOne|Acc])
|
|
||||||
end;
|
|
||||||
split_checksum_list_blob_decode(Rest, Acc) ->
|
|
||||||
{lists:reverse(Acc), Rest}.
|
|
||||||
|
|
||||||
check_or_make_tagged_checksum(?CSUM_TAG_NONE, _Client_CSum, Chunk) ->
|
check_or_make_tagged_checksum(?CSUM_TAG_NONE, _Client_CSum, Chunk) ->
|
||||||
%% TODO: If the client was foolish enough to use
|
%% TODO: If the client was foolish enough to use
|
||||||
%% this type of non-checksum, then the client gets
|
%% this type of non-checksum, then the client gets
|
||||||
|
@ -859,9 +763,9 @@ timing_demo_test2() ->
|
||||||
{HexUSec, _} =
|
{HexUSec, _} =
|
||||||
timer:tc(fun() ->
|
timer:tc(fun() ->
|
||||||
lists:foldl(fun(X, _) ->
|
lists:foldl(fun(X, _) ->
|
||||||
B = encode_csum_file_entry_hex(X, 100, CSum),
|
B = machi_checksums:encode_csum_file_entry_hex(X, 100, CSum),
|
||||||
%% file:write(ZZZ, [B, 10]),
|
%% file:write(ZZZ, [B, 10]),
|
||||||
decode_csum_file_entry_hex(list_to_binary(B))
|
machi_checksums:decode_csum_file_entry_hex(list_to_binary(B))
|
||||||
end, x, Xs)
|
end, x, Xs)
|
||||||
end),
|
end),
|
||||||
io:format(user, "~.3f sec\n", [HexUSec / 1000000]),
|
io:format(user, "~.3f sec\n", [HexUSec / 1000000]),
|
||||||
|
@ -872,14 +776,14 @@ timing_demo_test2() ->
|
||||||
{NotSortedUSec, _} =
|
{NotSortedUSec, _} =
|
||||||
timer:tc(fun() ->
|
timer:tc(fun() ->
|
||||||
lists:foldl(fun(X, _) ->
|
lists:foldl(fun(X, _) ->
|
||||||
B = encode_csum_file_entry(X, 100, CSum),
|
B = machi_checksums:encode_csum_file_entry(X, 100, CSum),
|
||||||
decode_csum_file_entry(list_to_binary(B))
|
machi_checksums:decode_csum_file_entry(list_to_binary(B))
|
||||||
end, x, Xs)
|
end, x, Xs)
|
||||||
end),
|
end),
|
||||||
io:format(user, "~.3f sec\n", [NotSortedUSec / 1000000]),
|
io:format(user, "~.3f sec\n", [NotSortedUSec / 1000000]),
|
||||||
|
|
||||||
NotHexList = lists:foldl(fun(X, Acc) ->
|
NotHexList = lists:foldl(fun(X, Acc) ->
|
||||||
B = encode_csum_file_entry(X, 100, CSum),
|
B = machi_checksums:encode_csum_file_entry(X, 100, CSum),
|
||||||
[B|Acc]
|
[B|Acc]
|
||||||
end, [], Xs),
|
end, [], Xs),
|
||||||
NotHexBin = iolist_to_binary(NotHexList),
|
NotHexBin = iolist_to_binary(NotHexList),
|
||||||
|
@ -890,7 +794,7 @@ timing_demo_test2() ->
|
||||||
timer:tc(fun() ->
|
timer:tc(fun() ->
|
||||||
put(hack_length, 29),
|
put(hack_length, 29),
|
||||||
put(hack_different, false),
|
put(hack_different, false),
|
||||||
{Sorted, _Leftover} = split_checksum_list_blob(NotHexBin),
|
{Sorted, _Leftover} = machi_checksums:split_checksum_list_blob(NotHexBin),
|
||||||
io:format(user, " Leftover ~p (hack_different ~p) ", [_Leftover, get(hack_different)]),
|
io:format(user, " Leftover ~p (hack_different ~p) ", [_Leftover, get(hack_different)]),
|
||||||
Sorted
|
Sorted
|
||||||
end),
|
end),
|
||||||
|
@ -964,7 +868,7 @@ sort_input_fun(FH, PrevStuff) ->
|
||||||
true ->
|
true ->
|
||||||
<<PrevStuff/binary, NewStuff/binary>>
|
<<PrevStuff/binary, NewStuff/binary>>
|
||||||
end,
|
end,
|
||||||
{SplitRes, Leftover} = split_checksum_list_blob(AllStuff),
|
{SplitRes, Leftover} = machi_checksums:split_checksum_list_blob(AllStuff),
|
||||||
{SplitRes, sort_input_fun(FH, Leftover)};
|
{SplitRes, sort_input_fun(FH, Leftover)};
|
||||||
eof ->
|
eof ->
|
||||||
end_of_input
|
end_of_input
|
||||||
|
|
31
test/machi_csum_table_test.erl
Normal file
31
test/machi_csum_table_test.erl
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
-module(machi_csum_table_test).
|
||||||
|
-compile(export_all).
|
||||||
|
|
||||||
|
smoke_test() ->
|
||||||
|
Filename = "./temp-checksum-dumb-file",
|
||||||
|
_ = file:delete(Filename),
|
||||||
|
{ok, MC} = machi_csum_table:open(Filename, []),
|
||||||
|
{Offset, Size, Checksum} = {64, 34, <<"deadbeef">>},
|
||||||
|
{error, unknown_chunk} = machi_csum_table:find(MC, Offset, Size),
|
||||||
|
ok = machi_csum_table:write(MC, Offset, Size, Checksum),
|
||||||
|
{ok, Checksum} = machi_csum_table:find(MC, Offset, Size),
|
||||||
|
ok = machi_csum_table:trim(MC, Offset, Size),
|
||||||
|
{error, trimmed} = machi_csum_table:find(MC, Offset, Size),
|
||||||
|
ok = machi_csum_table:close(MC),
|
||||||
|
ok = machi_csum_table:delete(MC).
|
||||||
|
|
||||||
|
close_test() ->
|
||||||
|
Filename = "./temp-checksum-dumb-file-2",
|
||||||
|
_ = file:delete(Filename),
|
||||||
|
{ok, MC} = machi_csum_table:open(Filename, []),
|
||||||
|
{Offset, Size, Checksum} = {64, 34, <<"deadbeef">>},
|
||||||
|
{error, unknown_chunk} = machi_csum_table:find(MC, Offset, Size),
|
||||||
|
ok = machi_csum_table:write(MC, Offset, Size, Checksum),
|
||||||
|
{ok, Checksum} = machi_csum_table:find(MC, Offset, Size),
|
||||||
|
ok = machi_csum_table:close(MC),
|
||||||
|
|
||||||
|
{ok, MC2} = machi_csum_table:open(Filename, []),
|
||||||
|
{ok, Checksum} = machi_csum_table:find(MC2, Offset, Size),
|
||||||
|
ok = machi_csum_table:trim(MC2, Offset, Size),
|
||||||
|
{error, trimmed} = machi_csum_table:find(MC2, Offset, Size),
|
||||||
|
ok = machi_csum_table:delete(MC2).
|
Loading…
Reference in a new issue