diff --git a/src/machi_admin_util.erl b/src/machi_admin_util.erl index fb6dedb..46f6c3d 100644 --- a/src/machi_admin_util.erl +++ b/src/machi_admin_util.erl @@ -100,7 +100,7 @@ verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) -> try case ?FLU_C:checksum_list(Sock1, EpochID, File) of {ok, InfoBin} -> - {Info, _} = machi_csum_table:split_checksum_list_blob_decode(InfoBin), + Info = machi_csum_table:split_checksum_list_blob_decode(InfoBin), Res = lists:foldl(verify_chunk_checksum(File, ReadChunk), [], Info), {ok, Res}; @@ -115,7 +115,9 @@ verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) -> end. verify_chunk_checksum(File, ReadChunk) -> - fun({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) -> + fun({0, ?MINIMUM_OFFSET, none}, []) -> + []; + ({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) -> case ReadChunk(File, Offset, Size) of {ok, {[{_, Offset, Chunk, _}], _}} -> CSum2 = machi_util:checksum_chunk(Chunk), diff --git a/src/machi_csum_table.erl b/src/machi_csum_table.erl index 0b7be8e..88cc906 100644 --- a/src/machi_csum_table.erl +++ b/src/machi_csum_table.erl @@ -8,6 +8,7 @@ all_trimmed/2, calc_unwritten_bytes/1, split_checksum_list_blob_decode/1, + all/1, close/1, delete/1, foldl_chunks/3]). @@ -15,7 +16,6 @@ -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). --export([all/1]). -endif. -record(machi_csum_table, @@ -49,9 +49,8 @@ open(CSumFilename, _Opts) -> {ok, C0}. -spec split_checksum_list_blob_decode(binary())-> term(). -split_checksum_list_blob_decode(_Bin) -> - %% binary_to_term(Bin) - throw(not_yet). +split_checksum_list_blob_decode(Bin) -> + erlang:binary_to_term(Bin). -spec find(table(), machi_dt:file_offset(), machi_dt:file_size()) -> list({machi_dt:file_offset(), diff --git a/src/machi_file_proxy.erl b/src/machi_file_proxy.erl index 980d5a7..480aabd 100644 --- a/src/machi_file_proxy.erl +++ b/src/machi_file_proxy.erl @@ -57,7 +57,8 @@ write/4, trim/4, append/2, - append/4 + append/4, + checksum_list/1 ]). %% gen_server callbacks @@ -210,6 +211,10 @@ append(_Pid, ClientMeta, Extra, _Data) -> lager:warning("Bad arg to append: ClientMeta ~p, Extra ~p", [ClientMeta, Extra]), {error, bad_arg}. +-spec checksum_list(pid()) -> {ok, list()}. +checksum_list(Pid) -> + gen_server:call(Pid, {checksum_list}, ?TIMEOUT). + %% gen_server callbacks % @private @@ -430,6 +435,10 @@ handle_call({append, ClientMeta, Extra, Data}, _From, {reply, Resp, State#state{appends = {T+1, NewErr}, eof_position = NewEof}}; +handle_call({checksum_list}, _FRom, State = #state{csum_table=T}) -> + All = machi_csum_table:all(T), + {reply, {ok, All}, State}; + handle_call(Req, _From, State) -> lager:warning("Unknown call: ~p", [Req]), {reply, whoaaaaaaaaaaaa, State}. diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 042eaed..c75a75d 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -595,26 +595,22 @@ do_server_trim_chunk(File, Offset, Size, TriggerGC, #state{flu_name=FluName}) -> do_server_checksum_listing(File, #state{flu_name=FluName, data_dir=DataDir}=_S) -> case sanitize_file_string(File) of ok -> - ok = sync_checksum_file(FluName, File), - CSumPath = machi_util:make_checksum_filename(DataDir, File), - %% TODO: If this file is legitimately bigger than our - %% {packet_size,N} limit, then we'll have a difficult time, eh? - case file:read_file(CSumPath) of - {ok, Bin} -> + case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of + {ok, Pid} -> + {ok, List} = machi_file_proxy:checksum_list(Pid), + Bin = erlang:term_to_binary(List), if byte_size(Bin) > (?PB_MAX_MSG_SIZE - 1024) -> %% TODO: Fix this limitation by streaming the %% binary in multiple smaller PB messages. %% Also, don't read the file all at once. ^_^ error_logger:error_msg("~s:~w oversize ~s\n", - [?MODULE, ?LINE, CSumPath]), + [?MODULE, ?LINE, DataDir]), {error, bad_arg}; true -> {ok, Bin} end; - {error, enoent} -> - {error, no_such_file}; - {error, _} -> - {error, bad_arg} + {error, trimmed} -> + {error, trimmed} end; _ -> {error, bad_arg} @@ -728,21 +724,6 @@ sanitize_prefix(Prefix) -> error end. -sync_checksum_file(FluName, File) -> - %% We just lookup the pid here - we don't start a proxy server. If - %% there isn't a pid for this file, then we just return ok. The - %% csum file was synced when the proxy was shutdown. - %% - %% If there *is* a pid, we call the sync function to ensure the - %% csum file is sync'd before we return. (Or an error if we get - %% an error). - case machi_flu_metadata_mgr:lookup_proxy_pid(FluName, {file, File}) of - undefined -> - ok; - Pid -> - machi_file_proxy:sync(Pid, csum) - end. - make_listener_regname(BaseName) -> list_to_atom(atom_to_list(BaseName) ++ "_listener"). diff --git a/test/machi_admin_util_test.erl b/test/machi_admin_util_test.erl index 54a75f4..e00fa31 100644 --- a/test/machi_admin_util_test.erl +++ b/test/machi_admin_util_test.erl @@ -33,7 +33,11 @@ -define(FLU_C, machi_flu1_client). verify_file_checksums_test_() -> - {timeout, 60, fun() -> verify_file_checksums_test2() end}. + {setup, + fun() -> os:cmd("rm -rf ./data") end, + fun(_) -> os:cmd("rm -rf ./data") end, + {timeout, 60, fun() -> verify_file_checksums_test2() end} + }. verify_file_checksums_test2() -> Host = "localhost", @@ -50,8 +54,9 @@ verify_file_checksums_test2() -> Prefix, <>) || X <- lists:seq(1, NumChunks)], {ok, [{_FileSize,File}]} = ?FLU_C:list_files(Sock1, ?DUMMY_PV1_EPOCH), - {ok, []} = machi_admin_util:verify_file_checksums_remote( - Host, TcpPort, ?DUMMY_PV1_EPOCH, File), + ?assertEqual({ok, []}, + machi_admin_util:verify_file_checksums_remote( + Host, TcpPort, ?DUMMY_PV1_EPOCH, File)), %% Clobber the first 3 chunks, which are sizes 1/2/3. {_, Path} = machi_util:make_data_filename(DataDir,binary_to_list(File)),