Change checksum_list API to return a t2b list
This commit is contained in:
parent
befa776685
commit
07c2b97918
5 changed files with 32 additions and 36 deletions
|
@ -100,7 +100,7 @@ verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) ->
|
|||
try
|
||||
case ?FLU_C:checksum_list(Sock1, EpochID, File) of
|
||||
{ok, InfoBin} ->
|
||||
{Info, _} = machi_csum_table:split_checksum_list_blob_decode(InfoBin),
|
||||
Info = machi_csum_table:split_checksum_list_blob_decode(InfoBin),
|
||||
Res = lists:foldl(verify_chunk_checksum(File, ReadChunk),
|
||||
[], Info),
|
||||
{ok, Res};
|
||||
|
@ -115,7 +115,9 @@ verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) ->
|
|||
end.
|
||||
|
||||
verify_chunk_checksum(File, ReadChunk) ->
|
||||
fun({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) ->
|
||||
fun({0, ?MINIMUM_OFFSET, none}, []) ->
|
||||
[];
|
||||
({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) ->
|
||||
case ReadChunk(File, Offset, Size) of
|
||||
{ok, {[{_, Offset, Chunk, _}], _}} ->
|
||||
CSum2 = machi_util:checksum_chunk(Chunk),
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
all_trimmed/2,
|
||||
calc_unwritten_bytes/1,
|
||||
split_checksum_list_blob_decode/1,
|
||||
all/1,
|
||||
close/1, delete/1,
|
||||
foldl_chunks/3]).
|
||||
|
||||
|
@ -15,7 +16,6 @@
|
|||
|
||||
-ifdef(TEST).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-export([all/1]).
|
||||
-endif.
|
||||
|
||||
-record(machi_csum_table,
|
||||
|
@ -49,9 +49,8 @@ open(CSumFilename, _Opts) ->
|
|||
{ok, C0}.
|
||||
|
||||
-spec split_checksum_list_blob_decode(binary())-> term().
|
||||
split_checksum_list_blob_decode(_Bin) ->
|
||||
%% binary_to_term(Bin)
|
||||
throw(not_yet).
|
||||
split_checksum_list_blob_decode(Bin) ->
|
||||
erlang:binary_to_term(Bin).
|
||||
|
||||
-spec find(table(), machi_dt:file_offset(), machi_dt:file_size()) ->
|
||||
list({machi_dt:file_offset(),
|
||||
|
|
|
@ -57,7 +57,8 @@
|
|||
write/4,
|
||||
trim/4,
|
||||
append/2,
|
||||
append/4
|
||||
append/4,
|
||||
checksum_list/1
|
||||
]).
|
||||
|
||||
%% gen_server callbacks
|
||||
|
@ -210,6 +211,10 @@ append(_Pid, ClientMeta, Extra, _Data) ->
|
|||
lager:warning("Bad arg to append: ClientMeta ~p, Extra ~p", [ClientMeta, Extra]),
|
||||
{error, bad_arg}.
|
||||
|
||||
-spec checksum_list(pid()) -> {ok, list()}.
|
||||
checksum_list(Pid) ->
|
||||
gen_server:call(Pid, {checksum_list}, ?TIMEOUT).
|
||||
|
||||
%% gen_server callbacks
|
||||
|
||||
% @private
|
||||
|
@ -430,6 +435,10 @@ handle_call({append, ClientMeta, Extra, Data}, _From,
|
|||
{reply, Resp, State#state{appends = {T+1, NewErr},
|
||||
eof_position = NewEof}};
|
||||
|
||||
handle_call({checksum_list}, _FRom, State = #state{csum_table=T}) ->
|
||||
All = machi_csum_table:all(T),
|
||||
{reply, {ok, All}, State};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
lager:warning("Unknown call: ~p", [Req]),
|
||||
{reply, whoaaaaaaaaaaaa, State}.
|
||||
|
|
|
@ -595,26 +595,22 @@ do_server_trim_chunk(File, Offset, Size, TriggerGC, #state{flu_name=FluName}) ->
|
|||
do_server_checksum_listing(File, #state{flu_name=FluName, data_dir=DataDir}=_S) ->
|
||||
case sanitize_file_string(File) of
|
||||
ok ->
|
||||
ok = sync_checksum_file(FluName, File),
|
||||
CSumPath = machi_util:make_checksum_filename(DataDir, File),
|
||||
%% TODO: If this file is legitimately bigger than our
|
||||
%% {packet_size,N} limit, then we'll have a difficult time, eh?
|
||||
case file:read_file(CSumPath) of
|
||||
{ok, Bin} ->
|
||||
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
|
||||
{ok, Pid} ->
|
||||
{ok, List} = machi_file_proxy:checksum_list(Pid),
|
||||
Bin = erlang:term_to_binary(List),
|
||||
if byte_size(Bin) > (?PB_MAX_MSG_SIZE - 1024) ->
|
||||
%% TODO: Fix this limitation by streaming the
|
||||
%% binary in multiple smaller PB messages.
|
||||
%% Also, don't read the file all at once. ^_^
|
||||
error_logger:error_msg("~s:~w oversize ~s\n",
|
||||
[?MODULE, ?LINE, CSumPath]),
|
||||
[?MODULE, ?LINE, DataDir]),
|
||||
{error, bad_arg};
|
||||
true ->
|
||||
{ok, Bin}
|
||||
end;
|
||||
{error, enoent} ->
|
||||
{error, no_such_file};
|
||||
{error, _} ->
|
||||
{error, bad_arg}
|
||||
{error, trimmed} ->
|
||||
{error, trimmed}
|
||||
end;
|
||||
_ ->
|
||||
{error, bad_arg}
|
||||
|
@ -728,21 +724,6 @@ sanitize_prefix(Prefix) ->
|
|||
error
|
||||
end.
|
||||
|
||||
sync_checksum_file(FluName, File) ->
|
||||
%% We just lookup the pid here - we don't start a proxy server. If
|
||||
%% there isn't a pid for this file, then we just return ok. The
|
||||
%% csum file was synced when the proxy was shutdown.
|
||||
%%
|
||||
%% If there *is* a pid, we call the sync function to ensure the
|
||||
%% csum file is sync'd before we return. (Or an error if we get
|
||||
%% an error).
|
||||
case machi_flu_metadata_mgr:lookup_proxy_pid(FluName, {file, File}) of
|
||||
undefined ->
|
||||
ok;
|
||||
Pid ->
|
||||
machi_file_proxy:sync(Pid, csum)
|
||||
end.
|
||||
|
||||
make_listener_regname(BaseName) ->
|
||||
list_to_atom(atom_to_list(BaseName) ++ "_listener").
|
||||
|
||||
|
|
|
@ -33,7 +33,11 @@
|
|||
-define(FLU_C, machi_flu1_client).
|
||||
|
||||
verify_file_checksums_test_() ->
|
||||
{timeout, 60, fun() -> verify_file_checksums_test2() end}.
|
||||
{setup,
|
||||
fun() -> os:cmd("rm -rf ./data") end,
|
||||
fun(_) -> os:cmd("rm -rf ./data") end,
|
||||
{timeout, 60, fun() -> verify_file_checksums_test2() end}
|
||||
}.
|
||||
|
||||
verify_file_checksums_test2() ->
|
||||
Host = "localhost",
|
||||
|
@ -50,8 +54,9 @@ verify_file_checksums_test2() ->
|
|||
Prefix, <<X:(X*8)/big>>) ||
|
||||
X <- lists:seq(1, NumChunks)],
|
||||
{ok, [{_FileSize,File}]} = ?FLU_C:list_files(Sock1, ?DUMMY_PV1_EPOCH),
|
||||
{ok, []} = machi_admin_util:verify_file_checksums_remote(
|
||||
Host, TcpPort, ?DUMMY_PV1_EPOCH, File),
|
||||
?assertEqual({ok, []},
|
||||
machi_admin_util:verify_file_checksums_remote(
|
||||
Host, TcpPort, ?DUMMY_PV1_EPOCH, File)),
|
||||
|
||||
%% Clobber the first 3 chunks, which are sizes 1/2/3.
|
||||
{_, Path} = machi_util:make_data_filename(DataDir,binary_to_list(File)),
|
||||
|
|
Loading…
Reference in a new issue