From 5c20ee633738bb0fa61163a4a25c798b86f1247a Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Wed, 1 Apr 2015 17:59:40 +0900 Subject: [PATCH] Fix client API for file list & checksum list --- src/machi_admin_util.erl | 39 +++++++++++++++++++++++++++++++++++++++ src/machi_flu1.erl | 39 ++++++++++++++++++++++++++++++++++++--- src/machi_flu1_client.erl | 23 +++++++++++++++++++++-- test/machi_flu1_test.erl | 22 +++++++++++++++------- 4 files changed, 111 insertions(+), 12 deletions(-) create mode 100644 src/machi_admin_util.erl diff --git a/src/machi_admin_util.erl b/src/machi_admin_util.erl new file mode 100644 index 0000000..6a9bdc7 --- /dev/null +++ b/src/machi_admin_util.erl @@ -0,0 +1,39 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(machi_admin_util). + +-export([ + verify_file_checksums_remote/2, verify_file_checksums_remote/3 + ]). +-compile(export_all). + +-include("machi.hrl"). + +verify_file_checksums_remote(Sock, File) -> + verify_file_checksums_remote2(Sock, Sock, File). + +verify_file_checksums_remote(_Host, _TcpPort, File) -> + verify_file_checksums_remote2(todo, todo, File). + +%%%%%%%%%%%%%%%%%%%%%%%%%%% + +verify_file_checksums_remote2(Sock, Sock, File) -> + todo. diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index d78de9d..5a5f04e 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -311,6 +311,8 @@ do_net_server_checksum_listing(Sock, File, DataDir) -> end. do_net_server_checksum_listing2(Sock, File, DataDir) -> + ok = sync_checksum_file(File), + CSumPath = machi_util:make_checksum_filename(DataDir, File), case file:open(CSumPath, [read, raw, binary]) of {ok, FH} -> @@ -330,6 +332,29 @@ do_net_server_checksum_listing2(Sock, File, DataDir) -> ok = gen_tcp:send(Sock, "ERROR\n") end. +sync_checksum_file(File) -> + Prefix = re:replace(File, "\\..*", "", [{return, binary}]), + case write_server_find_pid(Prefix) of + undefined -> + ok; + Pid -> + Ref = make_ref(), + Pid ! {sync_stuff, self(), Ref}, + receive + {sync_finished, Ref} -> + ok + after 5000 -> + case write_server_find_pid(Prefix) of + undefined -> + ok; + Pid2 when Pid2 /= Pid -> + ok; + _Pid2 -> + error + end + end + end. + do_net_copy_bytes(FH, Sock) -> case file:read(FH, 1024*1024) of {ok, Bin} -> @@ -384,8 +409,7 @@ do_net_server_truncate_hackityhack2(Sock, File, DataDir) -> end. write_server_get_pid(Prefix, DataDir) -> - RegName = machi_util:make_regname(Prefix), - case whereis(RegName) of + case write_server_find_pid(Prefix) of undefined -> start_seq_append_server(Prefix, DataDir), timer:sleep(1), @@ -394,6 +418,10 @@ write_server_get_pid(Prefix, DataDir) -> Pid end. +write_server_find_pid(Prefix) -> + RegName = machi_util:make_regname(Prefix), + whereis(RegName). + start_seq_append_server(Prefix, DataDir) -> spawn_link(fun() -> run_seq_append_server(Prefix, DataDir) end). @@ -453,7 +481,12 @@ seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) -> CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10], ok = file:write(FHc, CSum_info), seq_append_server_loop(DataDir, Prefix, File, FH_, - FileNum, Offset + Len) + FileNum, Offset + Len); + {sync_stuff, FromPid, Ref} -> + file:sync(FHc), + FromPid ! {sync_finished, Ref}, + seq_append_server_loop(DataDir, Prefix, File, FH_, + FileNum, Offset) after 30*1000 -> ok = file:close(FHd), ok = file:close(FHc), diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 3cccf00..7e8bc1b 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -285,7 +285,10 @@ list2(Sock) -> list2({ok, <<".\n">>}, _Sock) -> []; list2({ok, Line}, Sock) -> - [Line|list2(gen_tcp:recv(Sock, 0), Sock)]; + FileLen = byte_size(Line) - 16 - 1 - 1, + <> = Line, + Size = machi_util:hexstr_to_int(SizeHex), + [{Size, File}|list2(gen_tcp:recv(Sock, 0), Sock)]; list2(Else, _Sock) -> throw({server_protocol_error, Else}). @@ -300,7 +303,7 @@ checksum_list2(Sock, File) -> <> = Rest, <> = machi_util:hexstr_to_bin(LenHex), ok = inet:setopts(Sock, [{packet, raw}]), - checksum_list_fast(Sock, Len); + {ok, checksum_list_finish(checksum_list_fast(Sock, Len))}; {ok, <<"ERROR NO-SUCH-FILE", _/binary>>} -> {error, no_such_file}; {ok, <<"ERROR BAD-ARG", _/binary>>} -> @@ -323,6 +326,22 @@ checksum_list_fast(Sock, Remaining) -> {ok, Bytes} = gen_tcp:recv(Sock, Num), [Bytes|checksum_list_fast(Sock, Remaining - byte_size(Bytes))]. +checksum_list_finish(Chunks) -> + Bin = case Chunks of + [X] -> + X; + _ -> + iolist_to_binary(Chunks) + end, + [begin + CSumLen = byte_size(Line) - 16 - 1 - 8 - 1, + <> = Line, + {machi_util:hexstr_to_int(OffsetHex), + machi_util:hexstr_to_int(SizeHex), + machi_util:hexstr_to_bin(CSum)} + end || Line <- re:split(Bin, "\n", [{return, binary}]), + Line /= <<>>]. write_chunk2(Sock, File0, Offset, Chunk0) -> try diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index d51143c..2b840a2 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -28,15 +28,20 @@ -define(FLU, machi_flu1). -define(FLU_C, machi_flu1_client). +setup_test_flu(RegName, TcpPort, DataDir) -> + clean_up_data_dir(DataDir), + + {ok, FLU1} = ?FLU:start_link([{RegName, TcpPort, DataDir}]), + FLU1. + flu_smoke_test() -> Host = "localhost", TcpPort = 32957, DataDir = "./data", Prefix = <<"prefix!">>, BadPrefix = BadFile = "no/good", - clean_up_data_dir(DataDir), - {ok, FLU1} = ?FLU:start_link([{smoke_flu, TcpPort, DataDir}]), + FLU1 = setup_test_flu(smoke_flu, TcpPort, DataDir), try {error, no_such_file} = ?FLU_C:checksum_list(Host, TcpPort, "does-not-exist"), @@ -48,21 +53,24 @@ flu_smoke_test() -> {ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort, Prefix, Chunk1), {ok, Chunk1} = ?FLU_C:read_chunk(Host, TcpPort, File1, Off1, Len1), + {ok, [{_,_,_}]} = ?FLU_C:checksum_list(Host, TcpPort, File1), {error, bad_arg} = ?FLU_C:append_chunk(Host, TcpPort, BadPrefix, Chunk1), + {ok, [{_,File1}]} = ?FLU_C:list_files(Host, TcpPort), + Len1 = size(Chunk1), + {error, no_such_file} = ?FLU_C:read_chunk(Host, TcpPort, + File1, Off1*983, Len1), + {error, partial_read} = ?FLU_C:read_chunk(Host, TcpPort, + File1, Off1, Len1*984), Chunk2 = <<"yo yo">>, Len2 = byte_size(Chunk2), Off2 = ?MINIMUM_OFFSET + 77, - File2 = "smoke-file", + File2 = "smoke-prefix", ok = ?FLU_C:write_chunk(Host, TcpPort, File2, Off2, Chunk2), {error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, BadFile, Off2, Chunk2), {ok, Chunk2} = ?FLU_C:read_chunk(Host, TcpPort, File2, Off2, Len2), - {error, no_such_file} = ?FLU_C:read_chunk(Host, TcpPort, - File2, Off2*983, Len2), - {error, partial_read} = ?FLU_C:read_chunk(Host, TcpPort, - File2, Off2, Len2*984), {error, no_such_file} = ?FLU_C:read_chunk(Host, TcpPort, "no!!", Off2, Len2), {error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,