Fix client API for file list & checksum list

This commit is contained in:
Scott Lystig Fritchie 2015-04-01 17:59:40 +09:00
parent d243ffca23
commit 5c20ee6337
4 changed files with 111 additions and 12 deletions

39
src/machi_admin_util.erl Normal file
View file

@ -0,0 +1,39 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_admin_util).
-export([
verify_file_checksums_remote/2, verify_file_checksums_remote/3
]).
-compile(export_all).
-include("machi.hrl").
verify_file_checksums_remote(Sock, File) ->
verify_file_checksums_remote2(Sock, Sock, File).
verify_file_checksums_remote(_Host, _TcpPort, File) ->
verify_file_checksums_remote2(todo, todo, File).
%%%%%%%%%%%%%%%%%%%%%%%%%%%
verify_file_checksums_remote2(Sock, Sock, File) ->
todo.

View file

@ -311,6 +311,8 @@ do_net_server_checksum_listing(Sock, File, DataDir) ->
end. end.
do_net_server_checksum_listing2(Sock, File, DataDir) -> do_net_server_checksum_listing2(Sock, File, DataDir) ->
ok = sync_checksum_file(File),
CSumPath = machi_util:make_checksum_filename(DataDir, File), CSumPath = machi_util:make_checksum_filename(DataDir, File),
case file:open(CSumPath, [read, raw, binary]) of case file:open(CSumPath, [read, raw, binary]) of
{ok, FH} -> {ok, FH} ->
@ -330,6 +332,29 @@ do_net_server_checksum_listing2(Sock, File, DataDir) ->
ok = gen_tcp:send(Sock, "ERROR\n") ok = gen_tcp:send(Sock, "ERROR\n")
end. end.
sync_checksum_file(File) ->
Prefix = re:replace(File, "\\..*", "", [{return, binary}]),
case write_server_find_pid(Prefix) of
undefined ->
ok;
Pid ->
Ref = make_ref(),
Pid ! {sync_stuff, self(), Ref},
receive
{sync_finished, Ref} ->
ok
after 5000 ->
case write_server_find_pid(Prefix) of
undefined ->
ok;
Pid2 when Pid2 /= Pid ->
ok;
_Pid2 ->
error
end
end
end.
do_net_copy_bytes(FH, Sock) -> do_net_copy_bytes(FH, Sock) ->
case file:read(FH, 1024*1024) of case file:read(FH, 1024*1024) of
{ok, Bin} -> {ok, Bin} ->
@ -384,8 +409,7 @@ do_net_server_truncate_hackityhack2(Sock, File, DataDir) ->
end. end.
write_server_get_pid(Prefix, DataDir) -> write_server_get_pid(Prefix, DataDir) ->
RegName = machi_util:make_regname(Prefix), case write_server_find_pid(Prefix) of
case whereis(RegName) of
undefined -> undefined ->
start_seq_append_server(Prefix, DataDir), start_seq_append_server(Prefix, DataDir),
timer:sleep(1), timer:sleep(1),
@ -394,6 +418,10 @@ write_server_get_pid(Prefix, DataDir) ->
Pid Pid
end. end.
write_server_find_pid(Prefix) ->
RegName = machi_util:make_regname(Prefix),
whereis(RegName).
start_seq_append_server(Prefix, DataDir) -> start_seq_append_server(Prefix, DataDir) ->
spawn_link(fun() -> run_seq_append_server(Prefix, DataDir) end). spawn_link(fun() -> run_seq_append_server(Prefix, DataDir) end).
@ -453,7 +481,12 @@ seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) ->
CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10], CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10],
ok = file:write(FHc, CSum_info), ok = file:write(FHc, CSum_info),
seq_append_server_loop(DataDir, Prefix, File, FH_, seq_append_server_loop(DataDir, Prefix, File, FH_,
FileNum, Offset + Len) FileNum, Offset + Len);
{sync_stuff, FromPid, Ref} ->
file:sync(FHc),
FromPid ! {sync_finished, Ref},
seq_append_server_loop(DataDir, Prefix, File, FH_,
FileNum, Offset)
after 30*1000 -> after 30*1000 ->
ok = file:close(FHd), ok = file:close(FHd),
ok = file:close(FHc), ok = file:close(FHc),

View file

@ -285,7 +285,10 @@ list2(Sock) ->
list2({ok, <<".\n">>}, _Sock) -> list2({ok, <<".\n">>}, _Sock) ->
[]; [];
list2({ok, Line}, Sock) -> list2({ok, Line}, Sock) ->
[Line|list2(gen_tcp:recv(Sock, 0), Sock)]; FileLen = byte_size(Line) - 16 - 1 - 1,
<<SizeHex:16/binary, " ", File:FileLen/binary, _/binary>> = Line,
Size = machi_util:hexstr_to_int(SizeHex),
[{Size, File}|list2(gen_tcp:recv(Sock, 0), Sock)];
list2(Else, _Sock) -> list2(Else, _Sock) ->
throw({server_protocol_error, Else}). throw({server_protocol_error, Else}).
@ -300,7 +303,7 @@ checksum_list2(Sock, File) ->
<<LenHex:RestLen/binary, _:1/binary>> = Rest, <<LenHex:RestLen/binary, _:1/binary>> = Rest,
<<Len:64/big>> = machi_util:hexstr_to_bin(LenHex), <<Len:64/big>> = machi_util:hexstr_to_bin(LenHex),
ok = inet:setopts(Sock, [{packet, raw}]), ok = inet:setopts(Sock, [{packet, raw}]),
checksum_list_fast(Sock, Len); {ok, checksum_list_finish(checksum_list_fast(Sock, Len))};
{ok, <<"ERROR NO-SUCH-FILE", _/binary>>} -> {ok, <<"ERROR NO-SUCH-FILE", _/binary>>} ->
{error, no_such_file}; {error, no_such_file};
{ok, <<"ERROR BAD-ARG", _/binary>>} -> {ok, <<"ERROR BAD-ARG", _/binary>>} ->
@ -323,6 +326,22 @@ checksum_list_fast(Sock, Remaining) ->
{ok, Bytes} = gen_tcp:recv(Sock, Num), {ok, Bytes} = gen_tcp:recv(Sock, Num),
[Bytes|checksum_list_fast(Sock, Remaining - byte_size(Bytes))]. [Bytes|checksum_list_fast(Sock, Remaining - byte_size(Bytes))].
checksum_list_finish(Chunks) ->
Bin = case Chunks of
[X] ->
X;
_ ->
iolist_to_binary(Chunks)
end,
[begin
CSumLen = byte_size(Line) - 16 - 1 - 8 - 1,
<<OffsetHex:16/binary, " ", SizeHex:8/binary, " ",
CSum:CSumLen/binary>> = Line,
{machi_util:hexstr_to_int(OffsetHex),
machi_util:hexstr_to_int(SizeHex),
machi_util:hexstr_to_bin(CSum)}
end || Line <- re:split(Bin, "\n", [{return, binary}]),
Line /= <<>>].
write_chunk2(Sock, File0, Offset, Chunk0) -> write_chunk2(Sock, File0, Offset, Chunk0) ->
try try

View file

@ -28,15 +28,20 @@
-define(FLU, machi_flu1). -define(FLU, machi_flu1).
-define(FLU_C, machi_flu1_client). -define(FLU_C, machi_flu1_client).
setup_test_flu(RegName, TcpPort, DataDir) ->
clean_up_data_dir(DataDir),
{ok, FLU1} = ?FLU:start_link([{RegName, TcpPort, DataDir}]),
FLU1.
flu_smoke_test() -> flu_smoke_test() ->
Host = "localhost", Host = "localhost",
TcpPort = 32957, TcpPort = 32957,
DataDir = "./data", DataDir = "./data",
Prefix = <<"prefix!">>, Prefix = <<"prefix!">>,
BadPrefix = BadFile = "no/good", BadPrefix = BadFile = "no/good",
clean_up_data_dir(DataDir),
{ok, FLU1} = ?FLU:start_link([{smoke_flu, TcpPort, DataDir}]), FLU1 = setup_test_flu(smoke_flu, TcpPort, DataDir),
try try
{error, no_such_file} = ?FLU_C:checksum_list(Host, TcpPort, {error, no_such_file} = ?FLU_C:checksum_list(Host, TcpPort,
"does-not-exist"), "does-not-exist"),
@ -48,21 +53,24 @@ flu_smoke_test() ->
{ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort, {ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort,
Prefix, Chunk1), Prefix, Chunk1),
{ok, Chunk1} = ?FLU_C:read_chunk(Host, TcpPort, File1, Off1, Len1), {ok, Chunk1} = ?FLU_C:read_chunk(Host, TcpPort, File1, Off1, Len1),
{ok, [{_,_,_}]} = ?FLU_C:checksum_list(Host, TcpPort, File1),
{error, bad_arg} = ?FLU_C:append_chunk(Host, TcpPort, {error, bad_arg} = ?FLU_C:append_chunk(Host, TcpPort,
BadPrefix, Chunk1), BadPrefix, Chunk1),
{ok, [{_,File1}]} = ?FLU_C:list_files(Host, TcpPort),
Len1 = size(Chunk1),
{error, no_such_file} = ?FLU_C:read_chunk(Host, TcpPort,
File1, Off1*983, Len1),
{error, partial_read} = ?FLU_C:read_chunk(Host, TcpPort,
File1, Off1, Len1*984),
Chunk2 = <<"yo yo">>, Chunk2 = <<"yo yo">>,
Len2 = byte_size(Chunk2), Len2 = byte_size(Chunk2),
Off2 = ?MINIMUM_OFFSET + 77, Off2 = ?MINIMUM_OFFSET + 77,
File2 = "smoke-file", File2 = "smoke-prefix",
ok = ?FLU_C:write_chunk(Host, TcpPort, File2, Off2, Chunk2), ok = ?FLU_C:write_chunk(Host, TcpPort, File2, Off2, Chunk2),
{error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, {error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort,
BadFile, Off2, Chunk2), BadFile, Off2, Chunk2),
{ok, Chunk2} = ?FLU_C:read_chunk(Host, TcpPort, File2, Off2, Len2), {ok, Chunk2} = ?FLU_C:read_chunk(Host, TcpPort, File2, Off2, Len2),
{error, no_such_file} = ?FLU_C:read_chunk(Host, TcpPort,
File2, Off2*983, Len2),
{error, partial_read} = ?FLU_C:read_chunk(Host, TcpPort,
File2, Off2, Len2*984),
{error, no_such_file} = ?FLU_C:read_chunk(Host, TcpPort, {error, no_such_file} = ?FLU_C:read_chunk(Host, TcpPort,
"no!!", Off2, Len2), "no!!", Off2, Len2),
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort, {error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,