machi/prototype/demo-day-hack/file0.erl
Scott Lystig Fritchie 29868678a4 Add file0_test.escript (and big squash)
Small cleanups

Small cleanups

Refactoring argnames & order for more consistency

Add server-side-calculated MD5 checksum + logging

file:consult() style checksum management, too slow! 513K csums = 105 seconds, ouch

Much faster checksum recording

Add checksum_list. Alas, line-by-line I/O is slow, neh?

Much faster checksum listing

Add file0_verify_checksums.escript and supporting code

Adjust escript +A and -smp flags

Add file0_compare_filelists.escript

First draft of file0_repair_server.escript

First draft of file0_repair_server.escript, part 2

WIP of file0_repair_server.escript, part 3

WIP of file0_repair_server.escript, part 4

Basic repair works, it seems, hooray!

When checksum file ordering is different, try a cheap(?) 'cmp' on sorted results instead

Add README.md

Initial import of szone_chash.erl

Add file0_cc_make_projection.escript and supporting code

Add file0_cc_map_prefix.escript and supporting code

Change think-o: hash output is a chain, silly boy

Add file0_cc_1file_write_redundant.escript and support

Add file0_cc_read_client.escript and supporting code

Add examples/servers.map & file0_start_servers.escript

WIP: working on file0_cc_migrate_files.escript

File migration finished, works, yay!

Add basic 'what am I' docs to each script

Add file0_server_daemon.escript

Minor fixes

Fix broken unit test

Add basho_bench run() commands for append & read ops with projection

Add to examples dir

WIP: erasure coding hack, part 1

Fix broken unit test

WIP: erasure coding hack, part 2

WIP: erasure coding hack, part 3, EC data write is finished!

WIP: erasure coding hack, part 4, EC data read still in progress

WIP: erasure coding hack, part 5, EC data read still in progress

WIP: erasure coding hack, part 5b, EC data read still in progress

WIP: erasure coding hack, EC data read finished!

README update, part 1

README update, part 2

Oops, put back the printed ouput for file-write-client and 1file-write-redundant-client

README update, part 3

Fix 'user' output bug in list-client

Ugly hacks to get output/no-output from write clients

Clean up minor output bugs

Clean up minor output bugs, part 2

README update, part 4

Clean up minor output bugs, part 3

Clean up minor output bugs, part 5

Clean up minor output bugs, part 6

README update, part 6

README update, part 7

README update, part 7

README update, part 8

Final edits/fixes for demo day

Fix another oops in the README/demo day script
2015-03-02 20:57:17 +09:00

1546 lines
59 KiB
Erlang

%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-ifndef(NO_MODULE).
-module(file0).
-compile(export_all).
-endif.
%% -mode(compile). % for escript use
-include("cc.hrl").
-include_lib("kernel/include/file.hrl").
-define(MAX_FILE_SIZE, 256*1024*1024). % 256 MBytes
%% -define(DATA_DIR, "/Volumes/SAM1/seq-tests/data").
-define(DATA_DIR, "./data").
-define(MINIMUM_OFFSET, 1024).
append(Server, Prefix, Chunk) when is_binary(Prefix), is_binary(Chunk) ->
CSum = checksum(Chunk),
Server ! {seq_append, self(), Prefix, Chunk, CSum},
receive
{assignment, Offset, File} ->
{Offset, File}
after 10*1000 ->
bummer
end.
start_append_server() ->
start_append_server(?MODULE).
start_append_server(Name) ->
start_append_server(Name, ?DATA_DIR).
start_append_server(Name, DataDir) ->
spawn_link(fun() -> run_append_server(Name, DataDir) end).
start_listen_server(Port) ->
start_listen_server(Port, ?DATA_DIR).
start_listen_server(Port, DataDir) ->
spawn_link(fun() -> run_listen_server(Port, DataDir) end).
run_append_server(Name, DataDir) ->
register(Name, self()),
append_server_loop(DataDir).
run_listen_server(Port, DataDir) ->
SockOpts = [{reuseaddr, true},
{mode, binary}, {active, false}, {packet, line}],
{ok, LSock} = gen_tcp:listen(Port, SockOpts),
listen_server_loop(LSock, DataDir).
append_server_loop(DataDir) ->
receive
{seq_append, From, Prefix, Chunk, CSum} ->
spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum,
DataDir) end),
append_server_loop(DataDir)
end.
listen_server_loop(LSock, DataDir) ->
{ok, Sock} = gen_tcp:accept(LSock),
spawn(fun() -> net_server_loop(Sock, DataDir) end),
listen_server_loop(LSock, DataDir).
net_server_loop(Sock, DataDir) ->
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0, 60*1000) of
{ok, Line} ->
%% verb("Got: ~p\n", [Line]),
PrefixLenLF = byte_size(Line) - 2 - 8 - 1 - 1,
PrefixLenCRLF = byte_size(Line) - 2 - 8 - 1 - 2,
FileLenLF = byte_size(Line) - 2 - 16 - 1 - 8 - 1 - 1,
FileLenCRLF = byte_size(Line) - 2 - 16 - 1 - 8 - 1 - 2,
CSumFileLenLF = byte_size(Line) - 2 - 1,
CSumFileLenCRLF = byte_size(Line) - 2 - 2,
WriteFileLenLF = byte_size(Line) - 7 - 16 - 1 - 8 - 1 - 1,
DelFileLenLF = byte_size(Line) - 14 - 1,
case Line of
%% For normal use
<<"A ", LenHex:8/binary, " ",
Prefix:PrefixLenLF/binary, "\n">> ->
do_net_server_append(Sock, LenHex, Prefix);
<<"A ", LenHex:8/binary, " ",
Prefix:PrefixLenCRLF/binary, "\r\n">> ->
do_net_server_append(Sock, LenHex, Prefix);
<<"R ", OffsetHex:16/binary, " ", LenHex:8/binary, " ",
File:FileLenLF/binary, "\n">> ->
do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir);
<<"R ", OffsetHex:16/binary, " ", LenHex:8/binary, " ",
File:FileLenCRLF/binary, "\r\n">> ->
do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir);
<<"L\n">> ->
do_net_server_listing(Sock, DataDir);
<<"L\r\n">> ->
do_net_server_listing(Sock, DataDir);
<<"C ", File:CSumFileLenLF/binary, "\n">> ->
do_net_server_checksum_listing(Sock, File, DataDir);
<<"C ", File:CSumFileLenCRLF/binary, "\n">> ->
do_net_server_checksum_listing(Sock, File, DataDir);
<<"QUIT\n">> ->
catch gen_tcp:close(Sock),
exit(normal);
<<"QUIT\r\n">> ->
catch gen_tcp:close(Sock),
exit(normal);
%% For "internal" replication only.
<<"W-repl ", OffsetHex:16/binary, " ", LenHex:8/binary, " ",
File:WriteFileLenLF/binary, "\n">> ->
do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir);
%% For data migration only.
<<"DEL-migration ", File:DelFileLenLF/binary, "\n">> ->
do_net_server_delete_migration_only(Sock, File, DataDir);
%% For erasure coding hackityhack
<<"TRUNC-hack--- ", File:DelFileLenLF/binary, "\n">> ->
do_net_server_truncate_hackityhack(Sock, File, DataDir);
_ ->
verb("Else Got: ~p\n", [Line]),
gen_tcp:send(Sock, "ERROR SYNTAX\n"),
catch gen_tcp:close(Sock),
exit(normal)
end,
net_server_loop(Sock, DataDir);
_ ->
catch gen_tcp:close(Sock),
exit(normal)
end.
do_net_server_append(Sock, LenHex, Prefix) ->
<<Len:32/big>> = hexstr_to_bin(LenHex),
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Chunk} = gen_tcp:recv(Sock, Len, 60*1000),
CSum = checksum(Chunk),
try
?MODULE ! {seq_append, self(), Prefix, Chunk, CSum}
catch error:badarg ->
error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE])
end,
receive
{assignment, Offset, File} ->
OffsetHex = bin_to_hexstr(<<Offset:64/big>>),
Out = io_lib:format("OK ~s ~s\n", [OffsetHex, File]),
ok = gen_tcp:send(Sock, Out)
after 10*1000 ->
ok = gen_tcp:send(Sock, "TIMEOUT\n")
end.
do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir) ->
DoItFun = fun(FH, Offset, Len) ->
case file:pread(FH, Offset, Len) of
{ok, Bytes} when byte_size(Bytes) == Len ->
gen_tcp:send(Sock, ["OK\n", Bytes]);
{ok, Bytes} ->
verb("ok read but wanted ~p got ~p: ~p @ offset ~p\n",
[Len, size(Bytes), FileBin, Offset]),
ok = gen_tcp:send(Sock, "ERROR PARTIAL-READ\n");
eof ->
perhaps_do_net_server_ec_read(Sock, FH);
_Else2 ->
verb("Else2 ~p ~p ~P\n",
[Offset, Len, _Else2, 20]),
ok = gen_tcp:send(Sock, "ERROR BAD-READ\n")
end
end,
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
[read, binary, raw], DoItFun).
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun) ->
<<Offset:64/big>> = hexstr_to_bin(OffsetHex),
<<Len:32/big>> = hexstr_to_bin(LenHex),
{_, Path} = make_data_filename(DataDir, FileBin),
OptsHasWrite = lists:member(write, FileOpts),
case file:open(Path, FileOpts) of
{ok, FH} ->
try
DoItFun(FH, Offset, Len)
after
file:close(FH)
end;
{error, enoent} when OptsHasWrite ->
ok = filelib:ensure_dir(Path),
do_net_server_readwrite_common(
Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun);
_Else ->
%%%%%% keep?? verb("Else ~p ~p ~p ~p\n", [Offset, Len, Path, _Else]),
ok = gen_tcp:send(Sock, "ERROR BAD-IO\n")
end.
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir) ->
CSumPath = make_checksum_filename(DataDir, FileBin),
case file:open(CSumPath, [append, raw, binary, delayed_write]) of
{ok, FHc} ->
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc);
{error, enoent} ->
ok = filelib:ensure_dir(CSumPath),
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir)
end.
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc) ->
DoItFun = fun(FHd, Offset, Len) ->
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Chunk} = gen_tcp:recv(Sock, Len),
CSum = checksum(Chunk),
case file:pwrite(FHd, Offset, Chunk) of
ok ->
CSumHex = bin_to_hexstr(CSum),
CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10],
ok = file:write(FHc, CSum_info),
ok = file:close(FHc),
gen_tcp:send(Sock, <<"OK\n">>);
_Else3 ->
verb("Else3 ~p ~p ~p\n",
[Offset, Len, _Else3]),
ok = gen_tcp:send(Sock, "ERROR BAD-PWRITE\n")
end
end,
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
[write, read, binary, raw], DoItFun).
perhaps_do_net_server_ec_read(Sock, FH) ->
case file:pread(FH, 0, ?MINIMUM_OFFSET) of
{ok, Bin} when byte_size(Bin) == ?MINIMUM_OFFSET ->
decode_and_reply_net_server_ec_read(Sock, Bin);
{ok, _AnythingElse} ->
ok = gen_tcp:send(Sock, "ERROR PARTIAL-READ2\n");
_AnythingElse ->
ok = gen_tcp:send(Sock, "ERROR BAD-PREAD\n")
end.
do_net_server_listing(Sock, DataDir) ->
Files = filelib:wildcard("*", DataDir) -- ["config"],
Out = ["OK\n",
[begin
{ok, FI} = file:read_file_info(DataDir ++ "/" ++ File),
Size = FI#file_info.size,
SizeBin = <<Size:64/big>>,
[bin_to_hexstr(SizeBin), <<" ">>,
list_to_binary(File), <<"\n">>]
end || File <- Files],
".\n"
],
ok = gen_tcp:send(Sock, Out).
do_net_server_checksum_listing(Sock, File, DataDir) ->
CSumPath = make_checksum_filename(DataDir, File),
case file:open(CSumPath, [read, raw, binary]) of
{ok, FH} ->
{ok, FI} = file:read_file_info(CSumPath),
Len = FI#file_info.size,
LenHex = list_to_binary(bin_to_hexstr(<<Len:64/big>>)),
%% Client has option of line-by-line with "." terminator,
%% or using the offset in the OK message to slurp things
%% down by exact byte size.
ok = gen_tcp:send(Sock, [<<"OK ">>, LenHex, <<"\n">>]),
do_net_copy_bytes(FH, Sock),
ok = file:close(FH),
ok = gen_tcp:send(Sock, ".\n");
{error, enoent} ->
ok = gen_tcp:send(Sock, "ERROR NO-SUCH-FILE\n");
_ ->
ok = gen_tcp:send(Sock, "ERROR\n")
end.
do_net_copy_bytes(FH, Sock) ->
case file:read(FH, 1024*1024) of
{ok, Bin} ->
ok = gen_tcp:send(Sock, Bin),
do_net_copy_bytes(FH, Sock);
eof ->
ok
end.
do_net_server_delete_migration_only(Sock, File, DataDir) ->
{_, Path} = make_data_filename(DataDir, File),
case file:delete(Path) of
ok ->
ok = gen_tcp:send(Sock, "OK\n");
{error, enoent} ->
ok = gen_tcp:send(Sock, "ERROR NO-SUCH-FILE\n");
_ ->
ok = gen_tcp:send(Sock, "ERROR\n")
end.
do_net_server_truncate_hackityhack(Sock, File, DataDir) ->
{_, Path} = make_data_filename(DataDir, File),
case file:open(Path, [read, write, binary, raw]) of
{ok, FH} ->
try
{ok, ?MINIMUM_OFFSET} = file:position(FH, ?MINIMUM_OFFSET),
ok = file:truncate(FH),
ok = gen_tcp:send(Sock, "OK\n")
after
file:close(FH)
end;
{error, enoent} ->
ok = gen_tcp:send(Sock, "ERROR NO-SUCH-FILE\n");
_ ->
ok = gen_tcp:send(Sock, "ERROR\n")
end.
decode_and_reply_net_server_ec_read(Sock, <<"a ", Rest/binary>>) ->
decode_and_reply_net_server_ec_read_version_a(Sock, Rest);
decode_and_reply_net_server_ec_read(Sock, <<0:8, _/binary>>) ->
ok = gen_tcp:send(Sock, <<"ERROR NOT-ERASURE\n">>).
decode_and_reply_net_server_ec_read_version_a(Sock, Rest) ->
%% <<BodyLenHex:4/binary, " ", StripeWidthHex:16/binary, " ",
%% OrigFileLenHex:16/binary, " ", _/binary>> = Rest,
HdrLen = 80 - 2 - 4 - 1,
<<BodyLenHex:4/binary, " ", Hdr:HdrLen/binary, Rest2/binary>> = Rest,
<<BodyLen:16/big>> = hexstr_to_bin(BodyLenHex),
<<Body:BodyLen/binary, _/binary>> = Rest2,
ok = gen_tcp:send(Sock, ["ERASURE ", BodyLenHex, " ", Hdr, Body]).
write_server_get_pid(Prefix, DataDir) ->
RegName = make_regname(Prefix),
case whereis(RegName) of
undefined ->
start_seq_append_server(Prefix, DataDir),
timer:sleep(1),
write_server_get_pid(Prefix, DataDir);
Pid ->
Pid
end.
append_server_dispatch(From, Prefix, Chunk, CSum, DataDir) ->
Pid = write_server_get_pid(Prefix, DataDir),
Pid ! {seq_append, From, Prefix, Chunk, CSum},
exit(normal).
start_seq_append_server(Prefix, DataDir) ->
spawn(fun() -> run_seq_append_server(Prefix, DataDir) end).
run_seq_append_server(Prefix, DataDir) ->
true = register(make_regname(Prefix), self()),
ok = filelib:ensure_dir(DataDir ++ "/unused"),
ok = filelib:ensure_dir(DataDir ++ "/config/unused"),
run_seq_append_server2(Prefix, DataDir).
run_seq_append_server2(Prefix, DataDir) ->
FileNum = read_max_filenum(DataDir, Prefix) + 1,
ok = increment_max_filenum(DataDir, Prefix),
info_msg("start: ~p server at file ~w\n", [Prefix, FileNum]),
seq_append_server_loop(DataDir, Prefix, FileNum).
seq_append_server_loop(DataDir, Prefix, FileNum) ->
SequencerNameHack = lists:flatten(io_lib:format(
"~.36B~.36B",
[element(3,now()),
list_to_integer(os:getpid())])),
{File, FullPath} = make_data_filename(DataDir, Prefix, SequencerNameHack,
FileNum),
{ok, FHd} = file:open(FullPath,
[write, binary, raw]),
%% [write, binary, raw, delayed_write]),
CSumPath = make_checksum_filename(DataDir, Prefix, SequencerNameHack,
FileNum),
{ok, FHc} = file:open(CSumPath, [append, raw, binary, delayed_write]),
seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}, FileNum,
?MINIMUM_OFFSET).
seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, FileNum, Offset)
when Offset > ?MAX_FILE_SIZE ->
ok = file:close(FHd),
ok = file:close(FHc),
info_msg("rollover: ~p server at file ~w offset ~w\n",
[Prefix, FileNum, Offset]),
run_seq_append_server2(Prefix, DataDir);
seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) ->
receive
{seq_append, From, Prefix, Chunk, CSum} ->
ok = file:pwrite(FHd, Offset, Chunk),
From ! {assignment, Offset, File},
Len = byte_size(Chunk),
OffsetHex = bin_to_hexstr(<<Offset:64/big>>),
LenHex = bin_to_hexstr(<<Len:32/big>>),
CSumHex = bin_to_hexstr(CSum),
CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10],
ok = file:write(FHc, CSum_info),
seq_append_server_loop(DataDir, Prefix, File, FH_,
FileNum, Offset + Len)
after 30*1000 ->
ok = file:close(FHd),
ok = file:close(FHc),
info_msg("stop: ~p server at file ~w offset ~w\n",
[Prefix, FileNum, Offset]),
exit(normal)
end.
make_regname(Prefix) ->
erlang:binary_to_atom(Prefix, latin1).
make_config_filename(DataDir, Prefix) ->
lists:flatten(io_lib:format("~s/config/~s", [DataDir, Prefix])).
make_checksum_filename(DataDir, Prefix, SequencerName, FileNum) ->
lists:flatten(io_lib:format("~s/config/~s.~s.~w.csum",
[DataDir, Prefix, SequencerName, FileNum])).
make_checksum_filename(DataDir, FileName) ->
lists:flatten(io_lib:format("~s/config/~s.csum", [DataDir, FileName])).
make_data_filename(DataDir, File) ->
FullPath = lists:flatten(io_lib:format("~s/~s", [DataDir, File])),
{File, FullPath}.
make_data_filename(DataDir, Prefix, SequencerName, FileNum) ->
File = erlang:iolist_to_binary(io_lib:format("~s.~s.~w",
[Prefix, SequencerName, FileNum])),
FullPath = lists:flatten(io_lib:format("~s/~s", [DataDir, File])),
{File, FullPath}.
read_max_filenum(DataDir, Prefix) ->
case file:read_file_info(make_config_filename(DataDir, Prefix)) of
{error, enoent} ->
0;
{ok, FI} ->
FI#file_info.size
end.
increment_max_filenum(DataDir, Prefix) ->
{ok, FH} = file:open(make_config_filename(DataDir, Prefix), [append]),
ok = file:write(FH, "x"),
%% ok = file:sync(FH),
ok = file:close(FH).
hexstr_to_bin(S) when is_list(S) ->
hexstr_to_bin(S, []);
hexstr_to_bin(B) when is_binary(B) ->
hexstr_to_bin(binary_to_list(B), []).
hexstr_to_bin([], Acc) ->
list_to_binary(lists:reverse(Acc));
hexstr_to_bin([X,Y|T], Acc) ->
{ok, [V], []} = io_lib:fread("~16u", [X,Y]),
hexstr_to_bin(T, [V | Acc]).
bin_to_hexstr(<<>>) ->
[];
bin_to_hexstr(<<X:4, Y:4, Rest/binary>>) ->
[hex_digit(X), hex_digit(Y)|bin_to_hexstr(Rest)].
hex_digit(X) when X < 10 ->
X + $0;
hex_digit(X) ->
X - 10 + $a.
%%%%%%%%%%%%%%%%%
%%% escript stuff
main2(["file-write-client"]) ->
io:format("Use: Write a local file to a single server.\n"),
io:format("Args: Host Port BlockSize Prefix LocalFile [OutputPath|'console']\n"),
erlang:halt(1);
main2(["file-write-client", Host, PortStr, BlockSizeStr, PrefixStr, LocalFile|Output]) ->
Sock = escript_connect(Host, PortStr),
BlockSize = list_to_integer(BlockSizeStr),
Prefix = list_to_binary(PrefixStr),
Res = escript_upload_file(Sock, BlockSize, Prefix, LocalFile),
FH = open_output_file(case Output of [] -> "console";
_ -> Output end),
print_upload_details(FH, Res),
(catch file:close(FH)),
Res;
main2(["1file-write-redundant-client"]) ->
io:format("Use: Write a local file to a series of servers.\n"),
io:format("Args: BlockSize Prefix LocalFilePath [silent] [Host Port [Host Port ...]]\n"),
erlang:halt(1);
main2(["1file-write-redundant-client", BlockSizeStr, PrefixStr, LocalFile|HPs0]) ->
BlockSize = list_to_integer(BlockSizeStr),
Prefix = list_to_binary(PrefixStr),
{Out, HPs} = case HPs0 of
["silent"|Rest] -> {silent, Rest};
_ -> {not_silent, HPs0}
end,
Res = escript_upload_redundant(HPs, BlockSize, Prefix, LocalFile),
if Out /= silent ->
print_upload_details(user, Res);
true ->
ok
end,
Res;
main2(["chunk-read-client"]) ->
io:format("Use: Read a series of chunks for a single server.\n"),
io:format("Args: Host Port LocalChunkDescriptionPath [OutputPath|'console']\n"),
erlang:halt(1);
main2(["chunk-read-client", Host, PortStr, ChunkFileList]) ->
main2(["chunk-read-client", Host, PortStr, ChunkFileList, "console"]);
main2(["chunk-read-client", Host, PortStr, ChunkFileList, OutputPath]) ->
FH = open_output_file(OutputPath),
OutFun = make_outfun(FH),
try
main2(["chunk-read-client2", Host, PortStr, ChunkFileList, OutFun])
after
(catch file:close(FH))
end;
main2(["chunk-read-client2", Host, PortStr, ChunkFileList, ProcFun]) ->
Sock = escript_connect(Host, PortStr),
escript_download_chunks(Sock, ChunkFileList, ProcFun);
main2(["list-client"]) ->
io:format("Use: List all files on a single server.\n"),
io:format("Args: Host Port [OutputPath]\n"),
erlang:halt(1);
main2(["list-client", Host, PortStr]) ->
Sock = escript_connect(Host, PortStr),
escript_list(Sock);
main2(["list-client", Host, PortStr, OutputPath]) ->
Sock = escript_connect(Host, PortStr),
escript_list(Sock, OutputPath);
main2(["checksum-list-client"]) ->
io:format("Use: List all chunk sizes & checksums for a single file.\n"),
io:format("Args: [ 'line-by-line' ] Host Port File\n"),
erlang:halt(1);
main2(["checksum-list-client-line-by-line", Host, PortStr, File]) ->
Sock = escript_connect(Host, PortStr),
ProcFun = make_outfun(user),
escript_checksum_list(Sock, File, line_by_line, ProcFun);
main2(["checksum-list-client", Host, PortStr, File]) ->
Sock = escript_connect(Host, PortStr),
ProcFun = make_outfun(user),
escript_checksum_list(Sock, File, fast, ProcFun);
main2(["delete-client"]) ->
io:format("Use: Delete a file (NOT FOR GENERAL USE)\n"),
io:format("Args: Host Port File\n"),
erlang:halt(1);
main2(["delete-client", Host, PortStr, File]) ->
Sock = escript_connect(Host, PortStr),
escript_delete(Sock, File);
main2(["server"]) ->
io:format("Use: Run a file server + TCP listen port service.\n"),
io:format("Args: Port DataDirPath\n"),
erlang:halt(1);
main2(["server", RegNameStr, PortStr, DataDir]) ->
Port = list_to_integer(PortStr),
%% application:start(sasl),
_Pid1 = start_listen_server(Port, DataDir),
_Pid2 = start_append_server(list_to_atom(RegNameStr), DataDir),
receive forever -> ok end;
%%%% cc flavors %%%%
main2(["cc-1file-write-redundant-client"]) ->
io:format("Use: Write a local file to a chain via projection.\n"),
io:format("Args: BlockSize Prefix LocalFilePath ProjectionPath\n"),
erlang:halt(1);
main2(["cc-1file-write-redundant-client", BlockSizeStr, PrefixStr, LocalFile, ProjectionPath]) ->
BlockSize = list_to_integer(BlockSizeStr),
Prefix = list_to_binary(PrefixStr),
{_Chain, RawHPs} = calc_chain(write, ProjectionPath, PrefixStr),
HPs = convert_raw_hps(RawHPs),
Res = escript_upload_redundant(HPs, BlockSize, Prefix, LocalFile),
print_upload_details(user, Res),
Res;
main2(["cc-chunk-read-client"]) ->
io:format("Use: Read a series of chunks from a chain via projection.\n"),
io:format("Args: ProjectionPath ChunkFileList [OutputPath|'console' \\\n\t[ErrorCorrection_ProjectionPath]]\n"),
erlang:halt(1);
main2(["cc-chunk-read-client", ProjectionPathOrDir, ChunkFileList]) ->
main3(["cc-chunk-read-client", ProjectionPathOrDir, ChunkFileList,"console",
undefined]);
main2(["cc-chunk-read-client", ProjectionPathOrDir, ChunkFileList, OutputPath]) ->
main3(["cc-chunk-read-client", ProjectionPathOrDir, ChunkFileList, OutputPath,
undefined]);
main2(["cc-chunk-read-client", ProjectionPathOrDir, ChunkFileList, OutputPath,
EC_ProjectionPath]) ->
main3(["cc-chunk-read-client", ProjectionPathOrDir, ChunkFileList, OutputPath,
EC_ProjectionPath]).
main3(["cc-chunk-read-client",
ProjectionPathOrDir, ChunkFileList, OutputPath, EC_ProjectionPath]) ->
P = read_projection_file(ProjectionPathOrDir),
ChainMap = read_chain_map_file(ProjectionPathOrDir),
FH = open_output_file(OutputPath),
ProcFun = make_outfun(FH),
Res = try
escript_cc_download_chunks(ChunkFileList, P, ChainMap, ProcFun,
EC_ProjectionPath)
after
(catch file:close(FH))
end,
Res.
escript_connect(Host, PortStr) ->
Port = list_to_integer(PortStr),
{ok, Sock} = gen_tcp:connect(Host, Port, [{active,false}, {mode,binary},
{packet, raw}]),
Sock.
escript_upload_file(Sock, BlockSize, Prefix, File) ->
{ok, FH} = file:open(File, [read, raw, binary]),
try
escript_upload_file2(file:read(FH, BlockSize), FH,
BlockSize, Prefix, Sock, [])
after
file:close(FH)
end.
escript_upload_file2({ok, Chunk}, FH, BlockSize, Prefix, Sock, Acc) ->
{OffsetHex, LenHex, File} = upload_chunk_append(Sock, Prefix, Chunk),
verb("~s ~s ~s\n", [OffsetHex, LenHex, File]),
<<Offset:64/big>> = hexstr_to_bin(OffsetHex),
<<Size:32/big>> = hexstr_to_bin(LenHex),
OSF = {Offset, Size, File},
escript_upload_file2(file:read(FH, BlockSize), FH, BlockSize, Prefix, Sock,
[OSF|Acc]);
escript_upload_file2(eof, _FH, _BlockSize, _Prefix, _Sock, Acc) ->
lists:reverse(Acc).
upload_chunk_append(Sock, Prefix, Chunk) ->
%% _ = crypto:hash(md5, Chunk),
Len = byte_size(Chunk),
LenHex = list_to_binary(bin_to_hexstr(<<Len:32/big>>)),
Cmd = <<"A ", LenHex/binary, " ", Prefix/binary, "\n">>,
ok = gen_tcp:send(Sock, [Cmd, Chunk]),
{ok, Line} = gen_tcp:recv(Sock, 0),
PathLen = byte_size(Line) - 3 - 16 - 1 - 1,
<<"OK ", OffsetHex:16/binary, " ", Path:PathLen/binary, _:1/binary>> = Line,
{OffsetHex, LenHex, Path}.
upload_chunk_write(Sock, Offset, File, Chunk) when is_integer(Offset) ->
OffsetHex = list_to_binary(bin_to_hexstr(<<Offset:64/big>>)),
upload_chunk_write(Sock, OffsetHex, File, Chunk);
upload_chunk_write(Sock, OffsetHex, File, Chunk) when is_binary(OffsetHex) ->
%% _ = crypto:hash(md5, Chunk),
Len = byte_size(Chunk),
LenHex = list_to_binary(bin_to_hexstr(<<Len:32/big>>)),
Cmd = <<"W-repl ", OffsetHex/binary, " ",
LenHex/binary, " ", File/binary, "\n">>,
ok = gen_tcp:send(Sock, [Cmd, Chunk]),
{ok, Line} = gen_tcp:recv(Sock, 0),
<<"OK\n">> = Line,
{OffsetHex, LenHex, File}.
escript_upload_redundant([Host, PortStr|HPs], BlockSize, Prefix, LocalFile) ->
Sock = escript_connect(Host, PortStr),
ok = inet:setopts(Sock, [{packet, line}]),
OSFs = try
escript_upload_file(Sock, BlockSize, Prefix, LocalFile)
after
gen_tcp:close(Sock)
end,
escript_upload_redundant2(HPs, OSFs, LocalFile, OSFs).
escript_upload_redundant2([], _OSFs, _LocalFile, OSFs) ->
OSFs;
escript_upload_redundant2([Host, PortStr|HPs], OSFs, LocalFile, OSFs) ->
Sock = escript_connect(Host, PortStr),
{ok, FH} = file:open(LocalFile, [read, binary, raw]),
try
[begin
{ok, Chunk} = file:read(FH, Size),
_OSF2 = upload_chunk_write(Sock, Offset, File, Chunk)
%% verb("~p: ~p\n", [{Host, PortStr}, OSF2])
end || {Offset, Size, File} <- OSFs]
after
gen_tcp:close(Sock),
file:close(FH)
end,
escript_upload_redundant2(HPs, OSFs, LocalFile, OSFs).
escript_download_chunks(Sock, {{{ChunkLine}}}, ProcFun) ->
escript_download_chunk({ok, ChunkLine}, invalid_fd, Sock, ProcFun);
escript_download_chunks(Sock, ChunkFileList, ProcFun) ->
{ok, FH} = file:open(ChunkFileList, [read, raw, binary]),
escript_download_chunk(file:read_line(FH), FH, Sock, ProcFun).
escript_download_chunk({ok, Line}, FH, Sock, ProcFun) ->
ChunkOrError = escript_cc_download_chunk2(Sock, Line),
ProcFun(ChunkOrError),
[ChunkOrError|
escript_download_chunk((catch file:read_line(FH)), FH, Sock, ProcFun)];
escript_download_chunk(eof, _FH, _Sock, ProcFun) ->
ProcFun(eof),
[];
escript_download_chunk(_Else, _FH, _Sock, ProcFun) ->
ProcFun(eof),
[].
escript_cc_download_chunks({{{ChunkLine}}}, P, ChainMap, ProcFun,
EC_ProjectionPath) ->
escript_cc_download_chunk({ok,ChunkLine}, invalid_fd, P, ChainMap, ProcFun,
EC_ProjectionPath);
escript_cc_download_chunks(ChunkFileList, P, ChainMap, ProcFun,
EC_ProjectionPath) ->
{ok, FH} = file:open(ChunkFileList, [read, raw, binary]),
escript_cc_download_chunk(file:read_line(FH), FH, P, ChainMap, ProcFun,
EC_ProjectionPath).
escript_cc_download_chunk({ok, Line}, FH, P, ChainMap, ProcFun,
EC_ProjectionPath) ->
RestLen = byte_size(Line) - 16 - 1 - 8 - 1 - 1,
<<_Offset:16/binary, " ", _Len:8/binary, " ", Rest:RestLen/binary, "\n">>
= Line,
Prefix = re:replace(Rest, "\\..*", "", [{return, binary}]),
{_Chains, RawHPs} = calc_chain(read, P, ChainMap, Prefix),
Chunk = lists:foldl(
fun(_RawHP, Bin) when is_binary(Bin) -> Bin;
(RawHP, _) ->
[Host, PortStr] = convert_raw_hps([RawHP]),
Sock = get_cached_sock(Host, PortStr),
case escript_cc_download_chunk2(Sock, Line) of
Bin when is_binary(Bin) ->
Bin;
{error, _} = Error ->
Error;
{erasure_encoded, _} = EC_info ->
escript_cc_download_ec_chunk(EC_info,
EC_ProjectionPath)
end
end, undefined, RawHPs),
ProcFun(Chunk),
[Chunk|escript_cc_download_chunk((catch file:read_line(FH)),
FH, P, ChainMap, ProcFun,
EC_ProjectionPath)];
escript_cc_download_chunk(eof, _FH, _P, _ChainMap, ProcFun,
_EC_ProjectionPath) ->
ProcFun(eof),
[];
escript_cc_download_chunk(Else, _FH, _P, _ChainMap, ProcFun,
_EC_ProjectionPath) ->
ProcFun(Else),
[].
escript_cc_download_chunk2(Sock, Line) ->
%% Line includes an LF, so we can be lazy.
CmdLF = [<<"R ">>, Line],
ok = gen_tcp:send(Sock, CmdLF),
case gen_tcp:recv(Sock, 3) of
{ok, <<"OK\n">>} ->
{_Offset, Size, _File} = read_hex_size(Line),
{ok, Chunk} = gen_tcp:recv(Sock, Size),
Chunk;
{ok, Else} ->
{ok, OldOpts} = inet:getopts(Sock, [packet]),
ok = inet:setopts(Sock, [{packet, line}]),
{ok, Else2} = gen_tcp:recv(Sock, 0),
ok = inet:setopts(Sock, OldOpts),
case Else of
<<"ERA">> ->
escript_cc_parse_ec_info(Sock, Line, Else2);
_ ->
{error, {Line, <<Else/binary, Else2/binary>>}}
end
end.
escript_cc_parse_ec_info(Sock, Line, Else2) ->
ChompLine = chomp(Line),
{Offset, Size, File} = read_hex_size(ChompLine),
<<"SURE ", BodyLenHex:4/binary, " ", StripeWidthHex:16/binary, " ",
OrigFileLenHex:16/binary, " rs_10_4_v1", _/binary>> = Else2,
<<BodyLen:16/big>> = hexstr_to_bin(BodyLenHex),
{ok, SummaryBody} = gen_tcp:recv(Sock, BodyLen),
<<StripeWidth:64/big>> = hexstr_to_bin(StripeWidthHex),
<<OrigFileLen:64/big>> = hexstr_to_bin(OrigFileLenHex),
NewFileNum = (Offset div StripeWidth) + 1,
NewOffset = Offset rem StripeWidth,
if Offset + Size > OrigFileLen ->
%% Client's request is larger than original file size, derp
{error, bad_offset_and_size};
NewOffset + Size > StripeWidth ->
%% Client's request straddles a stripe boundary, TODO fix me
{error, todo_TODO_implement_this_with_two_reads_and_then_glue_together};
true ->
NewOffsetHex = bin_to_hexstr(<<NewOffset:64/big>>),
LenHex = bin_to_hexstr(<<Size:32/big>>),
NewSuffix = file_suffix_rs_10_4_v1(NewFileNum),
NewFile = iolist_to_binary([File, NewSuffix]),
NewLine = iolist_to_binary([NewOffsetHex, " ", LenHex, " ",
NewFile, "\n"]),
{erasure_encoded, {Offset, Size, File, NewOffset, NewFile,
NewFileNum, NewLine, SummaryBody}}
end.
%% TODO: The EC method/version/type stuff here is loosey-goosey
escript_cc_download_ec_chunk(EC_info, undefined) ->
EC_info;
escript_cc_download_ec_chunk({erasure_encoded,
{_Offset, _Size, _File, _NewOffset, NewFile,
NewFileNum, NewLine, SummaryBody}},
EC_ProjectionPath) ->
{P, ChainMap} = get_cached_projection(EC_ProjectionPath),
%% Remember: we use the whole file name for hashing, not the prefix
{_Chains, RawHPs} = calc_chain(read, P, ChainMap, NewFile),
RawHP = lists:nth(NewFileNum, RawHPs),
[Host, PortStr] = convert_raw_hps([RawHP]),
Sock = get_cached_sock(Host, PortStr),
case escript_cc_download_chunk2(Sock, NewLine) of
Chunk when is_binary(Chunk) ->
Chunk;
{error, _} = Else ->
io:format("TODO: EC chunk get failed:\n\t~s\n", [NewLine]),
io:format("Use this info to reconstruct:\n\t~p\n\n", [SummaryBody]),
Else
end.
get_cached_projection(EC_ProjectionPath) ->
case get(cached_projection) of
undefined ->
P = read_projection_file(EC_ProjectionPath),
ChainMap = read_chain_map_file(EC_ProjectionPath),
put(cached_projection, {P, ChainMap}),
get_cached_projection(EC_ProjectionPath);
Stuff ->
Stuff
end.
file_suffix_rs_10_4_v1(1) -> <<"_k01">>;
file_suffix_rs_10_4_v1(2) -> <<"_k02">>;
file_suffix_rs_10_4_v1(3) -> <<"_k03">>;
file_suffix_rs_10_4_v1(4) -> <<"_k04">>;
file_suffix_rs_10_4_v1(5) -> <<"_k05">>;
file_suffix_rs_10_4_v1(6) -> <<"_k06">>;
file_suffix_rs_10_4_v1(7) -> <<"_k07">>;
file_suffix_rs_10_4_v1(8) -> <<"_k08">>;
file_suffix_rs_10_4_v1(9) -> <<"_k09">>;
file_suffix_rs_10_4_v1(10) -> <<"_k10">>.
escript_list(Sock) ->
escript_list2(Sock, make_outfun(user)).
escript_list(Sock, OutputPath) ->
io:format("OutputPath ~p\n", [OutputPath]),
{ok, FH} = file:open(OutputPath, [write]),
try
escript_list2(Sock, make_outfun(FH))
after
file:close(FH)
end.
escript_list2(Sock, ProcFun) ->
ok = gen_tcp:send(Sock, <<"L\n">>),
ok = inet:setopts(Sock, [{packet, line}]),
{ok, <<"OK\n">>} = gen_tcp:recv(Sock, 0),
Res = escript_list2(gen_tcp:recv(Sock, 0), Sock, ProcFun),
ok = inet:setopts(Sock, [{packet, raw}]),
Res.
escript_list2({ok, <<".\n">>}, _Sock, ProcFun) ->
ProcFun(eof),
[];
escript_list2({ok, Line}, Sock, ProcFun) ->
ProcFun(Line),
[Line|escript_list2(gen_tcp:recv(Sock, 0), Sock, ProcFun)];
escript_list2(Else, _Sock, ProcFun) ->
ProcFun(io_lib:format("ERROR: ~p\n", [Else])),
{error, Else}.
escript_checksum_list(Sock, File, Method, ProcFun) ->
ok = gen_tcp:send(Sock, [<<"C ">>, File, <<"\n">>]),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of
{ok, <<"OK ", Rest/binary>> = Line} ->
put(status, ok), % may be unset later
ProcFun(Line),
if Method == line_by_line ->
escript_checksum_list_line_by_line(Sock, ProcFun);
Method == fast ->
RestLen = byte_size(Rest) - 1,
<<LenHex:RestLen/binary, _:1/binary>> = Rest,
<<Len:64/big>> = hexstr_to_bin(LenHex),
ok = inet:setopts(Sock, [{packet, raw}]),
escript_checksum_list_fast(Sock, Len, ProcFun)
end;
{ok, _BAH} ->
put(status, error),
io:format("ERROR: ~p\n", [_BAH])
end.
escript_checksum_list_line_by_line(Sock, ProcFun) ->
case gen_tcp:recv(Sock, 0) of
{ok, <<".\n">> = Line} ->
ProcFun(Line),
ok;
{ok, Line} ->
ProcFun(Line),
escript_checksum_list_line_by_line(Sock, ProcFun)
end.
escript_checksum_list_fast(Sock, 0, ProcFun) ->
{ok, <<".\n">> = Line} = gen_tcp:recv(Sock, 2),
ProcFun(Line),
ok;
escript_checksum_list_fast(Sock, Remaining, ProcFun) ->
Num = erlang:min(Remaining, 1024*1024),
{ok, Bytes} = gen_tcp:recv(Sock, Num),
ProcFun(Bytes),
escript_checksum_list_fast(Sock, Remaining - byte_size(Bytes), ProcFun).
escript_delete(Sock, File) ->
ok = gen_tcp:send(Sock, [<<"DEL-migration ">>, File, <<"\n">>]),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of
{ok, <<"OK\n">>} ->
ok;
{ok, <<"ERROR", _/binary>>} ->
error
end.
escript_compare_servers(Sock1, Sock2, H1, H2, Args) ->
FileFilterFun = fun(_) -> true end,
escript_compare_servers(Sock1, Sock2, H1, H2, FileFilterFun, Args).
escript_compare_servers(Sock1, Sock2, H1, H2, FileFilterFun, Args) ->
All = [H1, H2],
put(mydict, dict:new()),
Fetch1 = make_fetcher(H1),
Fetch2 = make_fetcher(H2),
Fmt = case Args of
[] ->
fun(eof) -> ok; (Str) -> io:format(user, Str, []) end;
[null] ->
fun(_) -> ok end;
[OutFile] ->
{ok, FH} = file:open(OutFile, [write]),
fun(eof) -> file:close(FH);
(Str) -> file:write(FH, Str)
end
end,
_X1 = escript_list2(Sock1, Fetch1),
_X2 = escript_list2(Sock2, Fetch2),
FoldRes = lists:sort(dict:to_list(get(mydict))),
Fmt("{legend, {file, list_of_servers_without_file}}.\n"),
Fmt(io_lib:format("{all, ~p}.\n", [All])),
Res = [begin
{GotIt, Sizes} = lists:unzip(GotSizes),
Size = lists:max(Sizes),
Missing = {File, {Size, All -- GotIt}},
verb("~p.\n", [Missing]),
Missing
end || {File, GotSizes} <- FoldRes, FileFilterFun(File)],
(catch Fmt(eof)),
Res.
make_fetcher(Host) ->
fun(eof) ->
ok;
(<<SizeHex:16/binary, " ", Rest/binary>>) ->
<<Size:64/big>> = hexstr_to_bin(SizeHex),
FileLen = byte_size(Rest) - 1,
<<File:FileLen/binary, _/binary>> = Rest,
NewDict = dict:append(File, {Host, Size}, get(mydict)),
put(mydict, NewDict)
end.
checksum(Bin) when is_binary(Bin) ->
crypto:hash(md5, Bin).
verb(Fmt) ->
verb(Fmt, []).
verb(Fmt, Args) ->
case application:get_env(kernel, verbose) of
{ok, true} -> io:format(Fmt, Args);
_ -> ok
end.
info_msg(Fmt, Args) ->
case application:get_env(kernel, verbose) of {ok, false} -> ok;
_ -> error_logger:info_msg(Fmt, Args)
end.
repair(File, Size, [], Mode, V, SrcS, SrcS2, DstS, DstS2, _Src) ->
verb("~s: present on both: ", [File]),
repair_both_present(File, Size, Mode, V, SrcS, SrcS2, DstS, DstS2);
repair(File, Size, MissingList, Mode, V, SrcS, SrcS2, DstS, _DstS2, Src) ->
case lists:member(Src, MissingList) of
true ->
verb("~s -> ~p, skipping: not on source server\n", [File, MissingList]);
false when Mode == check ->
verb("~s -> ~p, copy ~s MB (skipped)\n", [File, MissingList, mbytes(Size)]);
false ->
verb("~s -> ~p, copy ~s MB ", [File, MissingList, mbytes(Size)]),
ok = copy_file(File, SrcS, SrcS2, DstS, V),
verb("done\n", [])
end.
copy_file(File, SrcS, SrcS2, DstS, Verbose) ->
%% Use the *second* source socket to copy each chunk.
ProcChecksum = copy_file_proc_checksum_fun(File, SrcS2, DstS, Verbose),
%% Use the *first source socket to enumerate the chunks & checksums.
escript_checksum_list(SrcS, File, line_by_line, ProcChecksum).
copy_file_proc_checksum_fun(File, SrcS, DstS, _Verbose) ->
fun(<<OffsetHex:16/binary, " ", LenHex:8/binary, " ",
CSumHex:32/binary, "\n">>) ->
<<Len:32/big>> = hexstr_to_bin(LenHex),
DownloadChunkBin = <<OffsetHex/binary, " ", LenHex/binary, " ",
File/binary, "\n">>,
[Chunk] = escript_download_chunks(SrcS, {{{DownloadChunkBin}}},
fun(_) -> ok end),
CSum = hexstr_to_bin(CSumHex),
CSum2 = checksum(Chunk),
if Len == byte_size(Chunk), CSum == CSum2 ->
{_,_,_} = upload_chunk_write(DstS, OffsetHex, File, Chunk),
ok;
true ->
io:format("ERROR: ~s ~s ~s csum/size error\n",
[File, OffsetHex, LenHex]),
error
end;
(_Else) ->
ok
end.
repair_both_present(File, Size, Mode, V, SrcS, _SrcS2, DstS, _DstS2) ->
Tmp1 = lists:flatten(io_lib:format("/tmp/sort.1.~w.~w.~w", tuple_to_list(now()))),
Tmp2 = lists:flatten(io_lib:format("/tmp/sort.2.~w.~w.~w", tuple_to_list(now()))),
J_Both = lists:flatten(io_lib:format("/tmp/join.3-both.~w.~w.~w", tuple_to_list(now()))),
J_SrcOnly = lists:flatten(io_lib:format("/tmp/join.4-src-only.~w.~w.~w", tuple_to_list(now()))),
J_DstOnly = lists:flatten(io_lib:format("/tmp/join.5-dst-only.~w.~w.~w", tuple_to_list(now()))),
S_Identical = lists:flatten(io_lib:format("/tmp/join.6-sort-identical.~w.~w.~w", tuple_to_list(now()))),
{ok, FH1} = file:open(Tmp1, [write, raw, binary]),
{ok, FH2} = file:open(Tmp2, [write, raw, binary]),
try
K = md5_ctx,
MD5_it = fun(Bin) ->
{FH, MD5ctx1} = get(K),
file:write(FH, Bin),
MD5ctx2 = crypto:hash_update(MD5ctx1, Bin),
put(K, {FH, MD5ctx2})
end,
put(K, {FH1, crypto:hash_init(md5)}),
ok = escript_checksum_list(SrcS, File, fast, MD5_it),
{_, MD5_1} = get(K),
SrcMD5 = crypto:hash_final(MD5_1),
put(K, {FH2, crypto:hash_init(md5)}),
ok = escript_checksum_list(DstS, File, fast, MD5_it),
{_, MD5_2} = get(K),
DstMD5 = crypto:hash_final(MD5_2),
if SrcMD5 == DstMD5 ->
verb("identical\n", []);
true ->
ok = file:close(FH1),
ok = file:close(FH2),
_Q1 = os:cmd("./REPAIR-SORT-JOIN.sh " ++ Tmp1 ++ " " ++ Tmp2 ++ " " ++ J_Both ++ " " ++ J_SrcOnly ++ " " ++ J_DstOnly ++ " " ++ S_Identical),
case file:read_file_info(S_Identical) of
{ok, _} ->
verb("identical (secondary sort)\n", []);
{error, enoent} ->
io:format("differences found:"),
repair_both(File, Size, V, Mode,
J_Both, J_SrcOnly, J_DstOnly,
SrcS, DstS)
end
end
after
catch file:close(FH1),
catch file:close(FH2),
[(catch file:delete(FF)) || FF <- [Tmp1,Tmp2,J_Both,J_SrcOnly,J_DstOnly,
S_Identical]]
end.
repair_both(File, _Size, V, Mode, J_Both, J_SrcOnly, J_DstOnly, SrcS, DstS) ->
AccFun = if Mode == check ->
fun(_X, List) -> List end;
Mode == repair ->
fun( X, List) -> [X|List] end
end,
BothFun = fun(<<_OffsetSrcHex:16/binary, " ",
LenSrcHex:8/binary, " ", CSumSrcHex:32/binary, " ",
LenDstHex:8/binary, " ", CSumDstHex:32/binary, "\n">> =Line,
{SameB, SameC, DiffB, DiffC, Ds}) ->
<<Len:32/big>> = hexstr_to_bin(LenSrcHex),
if LenSrcHex == LenDstHex,
CSumSrcHex == CSumDstHex ->
{SameB + Len, SameC + 1, DiffB, DiffC, Ds};
true ->
%% D = {OffsetSrcHex, LenSrcHex, ........
{SameB, SameC, DiffB + Len, DiffC + 1,
AccFun(Line, Ds)}
end;
(_Else, Acc) ->
Acc
end,
OnlyFun = fun(<<_OffsetSrcHex:16/binary, " ", LenSrcHex:8/binary, " ",
_CSumHex:32/binary, "\n">> = Line,
{DiffB, DiffC, Ds}) ->
<<Len:32/big>> = hexstr_to_bin(LenSrcHex),
{DiffB + Len, DiffC + 1, AccFun(Line, Ds)};
(_Else, Acc) ->
Acc
end,
{SameBx, SameCx, DiffBy, DiffCy, BothDiffs} =
file_folder(BothFun, {0,0,0,0,[]}, J_Both),
{DiffB_src, DiffC_src, Ds_src} = file_folder(OnlyFun, {0,0,[]}, J_SrcOnly),
{DiffB_dst, DiffC_dst, Ds_dst} = file_folder(OnlyFun, {0,0,[]}, J_DstOnly),
if Mode == check orelse V == true ->
io:format("\n\t"),
io:format("BothR ~p, ", [{SameBx, SameCx, DiffBy, DiffCy}]),
io:format("SrcR ~p, ", [{DiffB_src, DiffC_src}]),
io:format("DstR ~p", [{DiffB_dst, DiffC_dst}]),
io:format("\n");
true -> ok
end,
if Mode == repair ->
ok = repair_both_both(File, V, BothDiffs, SrcS, DstS),
ok = repair_copy_chunks(File, V, Ds_src, DiffB_src, DiffC_src,
SrcS, DstS),
ok = repair_copy_chunks(File, V, Ds_dst, DiffB_dst, DiffC_dst,
DstS, SrcS);
true ->
ok
end.
repair_both_both(_File, _V, [_|_], _SrcS, _DstS) ->
%% TODO: fetch both, check checksums, hopefully only exactly one
%% is correct, then use that one to repair the other. And if the
%% sizes are different, hrm, there may be an extra corner case(s)
%% hiding there.
io:format("WHOA! We have differing checksums or sizes here, TODO not implemented, but there's trouble in the little village on the river....\n"),
timer:sleep(3*1000),
ok;
repair_both_both(_File, _V, [], _SrcS, _DstS) ->
ok.
repair_copy_chunks(_File, _V, [], _DiffBytes, _DiffCount, _SrcS, _DstS) ->
ok;
repair_copy_chunks(File, V, ToBeCopied, DiffBytes, DiffCount, SrcS, DstS) ->
verb("\n", []),
verb("Starting copy of ~p chunks/~s MBytes to \n ~s: ",
[DiffCount, mbytes(DiffBytes), File]),
InnerCopyFun = copy_file_proc_checksum_fun(File, SrcS, DstS, V),
FoldFun = fun(Line, ok) ->
ok = InnerCopyFun(Line) % Strong sanity check
end,
ok = lists:foldl(FoldFun, ok, ToBeCopied),
verb(" done\n", []),
ok.
file_folder(Fun, Acc, Path) ->
{ok, FH} = file:open(Path, [read, raw, binary]),
try
file_folder2(Fun, Acc, FH)
after
file:close(FH)
end.
file_folder2(Fun, Acc, FH) ->
file_folder2(file:read_line(FH), Fun, Acc, FH).
file_folder2({ok, Line}, Fun, Acc, FH) ->
Acc2 = Fun(Line, Acc),
file_folder2(Fun, Acc2, FH);
file_folder2(eof, _Fun, Acc, _FH) ->
Acc.
make_repair_props(["check"|T]) ->
[{mode, check}|make_repair_props(T)];
make_repair_props(["repair"|T]) ->
[{mode, repair}|make_repair_props(T)];
make_repair_props(["verbose"|T]) ->
application:set_env(kernel, verbose, true),
[{verbose, true}|make_repair_props(T)];
make_repair_props(["noverbose"|T]) ->
[{verbose, false}|make_repair_props(T)];
make_repair_props(["progress"|T]) ->
[{progress, true}|make_repair_props(T)];
make_repair_props(["delete-source"|T]) ->
[{delete_source, true}|make_repair_props(T)];
make_repair_props(["nodelete-source"|T]) ->
[{delete_source, false}|make_repair_props(T)];
make_repair_props(["nodelete-tmp"|T]) ->
[{delete_tmp, false}|make_repair_props(T)];
make_repair_props([X|T]) ->
io:format("Error: skipping unknown option ~p\n", [X]),
make_repair_props(T);
make_repair_props([]) ->
%% Proplist defaults
[{mode, check}, {delete_source, false}].
mbytes(0) ->
"0.0";
mbytes(Size) ->
lists:flatten(io_lib:format("~.1.0f", [max(0.1, Size / (1024*1024))])).
chomp(Line) when is_binary(Line) ->
LineLen = byte_size(Line) - 1,
<<ChompLine:LineLen/binary, _/binary>> = Line,
ChompLine.
make_outfun(FH) ->
fun({error, _} = Error) ->
file:write(FH, io_lib:format("Error: ~p\n", [Error]));
(eof) ->
ok;
({erasure_encoded, Info} = _Erasure) ->
file:write(FH, "TODO/WIP: erasure_coded:\n"),
file:write(FH, io_lib:format("\t~p\n", [Info]));
(Bytes) when is_binary(Bytes) orelse is_list(Bytes) ->
file:write(FH, Bytes)
end.
open_output_file("console") ->
user;
open_output_file(Path) ->
{ok, FH} = file:open(Path, [write]),
FH.
print_upload_details(_, {error, _} = Res) ->
io:format("Error: ~p\n", [Res]),
erlang:halt(1);
print_upload_details(FH, Res) ->
[io:format(FH, "~s ~s ~s\n", [bin_to_hexstr(<<Offset:64/big>>),
bin_to_hexstr(<<Len:32/big>>),
File]) ||
{Offset, Len, File} <- Res].
%%%%%%%%%%%%%%%%%
read_projection_file("new") ->
#projection{epoch=0, last_epoch=0,
float_map=undefined, last_float_map=undefined};
read_projection_file(Path) ->
case filelib:is_dir(Path) of
true ->
read_projection_file_loop(Path ++ "/current.proj");
false ->
case filelib:is_file(Path) of
true ->
read_projection_file2(Path);
false ->
error({bummer, Path})
end
end.
read_projection_file2(Path) ->
{ok, [P]} = file:consult(Path),
true = is_record(P, projection),
FloatMap = P#projection.float_map,
LastFloatMap = if P#projection.last_float_map == undefined ->
FloatMap;
true ->
P#projection.last_float_map
end,
P#projection{migrating=(FloatMap /= LastFloatMap),
tree=szone_chash:make_tree(FloatMap),
last_tree=szone_chash:make_tree(LastFloatMap)}.
read_projection_file_loop(Path) ->
read_projection_file_loop(Path, 100).
read_projection_file_loop(Path, 0) ->
error({bummer, Path});
read_projection_file_loop(Path, N) ->
try
read_projection_file2(Path)
catch
error:{badmatch,{error,enoent}} ->
timer:sleep(100),
read_projection_file_loop(Path, N-1)
end.
write_projection(P, Path) when is_record(P, projection) ->
{error, enoent} = file:read_file_info(Path),
{ok, FH} = file:open(Path, [write]),
WritingP = P#projection{tree=undefined, last_tree=undefined},
io:format(FH, "~p.\n", [WritingP]),
ok = file:close(FH).
read_weight_map_file(Path) ->
{ok, [Map]} = file:consult(Path),
true = is_list(Map),
true = lists:all(fun({Chain, Weight})
when is_binary(Chain),
is_integer(Weight), Weight >= 0 ->
true;
(_) ->
false
end, Map),
Map.
%% Assume the file "chains.map" in whatever dir that stores projections.
read_chain_map_file(DirPath) ->
L = case filelib:is_dir(DirPath) of
true ->
{ok, Map} = file:consult(DirPath ++ "/chains.map"),
Map;
false ->
Dir = filename:dirname(DirPath),
{ok, Map} = file:consult(Dir ++ "/chains.map"),
Map
end,
orddict:from_list(L).
get_float_map(P) when is_record(P, projection) ->
P#projection.float_map.
get_last_float_map(P) when is_record(P, projection) ->
P#projection.last_float_map.
hash_and_query(Key, P) when is_record(P, projection) ->
<<Int:(20*8)/unsigned>> = crypto:hash(sha, Key),
Float = Int / ?SHA_MAX,
{_, Current} = szone_chash:query_tree(Float, P#projection.tree),
if P#projection.migrating ->
{_, Last} = szone_chash:query_tree(Float, P#projection.last_tree),
if Last == Current ->
[Current];
true ->
[Current, Last, Current]
end;
true ->
[Current]
end.
calc_chain(write=Op, ProjectionPathOrDir, PrefixStr) ->
P = read_projection_file(ProjectionPathOrDir),
ChainMap = read_chain_map_file(ProjectionPathOrDir),
calc_chain(Op, P, ChainMap, PrefixStr);
calc_chain(read=Op, ProjectionPathOrDir, PrefixStr) ->
P = read_projection_file(ProjectionPathOrDir),
ChainMap = read_chain_map_file(ProjectionPathOrDir),
calc_chain(Op, P, ChainMap, PrefixStr).
calc_chain(write=_Op, P, ChainMap, PrefixStr) ->
%% Writes are easy: always use the new location.
[Chain|_] = hash_and_query(PrefixStr, P),
{Chain, orddict:fetch(Chain, ChainMap)};
calc_chain(read=_Op, P, ChainMap, PrefixStr) ->
%% Reads are slightly trickier: reverse each chain so tail is tried first.
Chains = hash_and_query(PrefixStr, P),
{Chains, lists:flatten([lists:reverse(orddict:fetch(Chain, ChainMap)) ||
Chain <- Chains])}.
convert_raw_hps([{HostBin, Port}|T]) ->
[binary_to_list(HostBin), integer_to_list(Port)|convert_raw_hps(T)];
convert_raw_hps([]) ->
[].
get_cached_sock(Host, PortStr) ->
K = {socket_cache, Host, PortStr},
case erlang:get(K) of
undefined ->
Sock = escript_connect(Host, PortStr),
Krev = {socket_cache_rev, Sock},
erlang:put(K, Sock),
erlang:put(Krev, {Host, PortStr}),
Sock;
Sock ->
Sock
end.
invalidate_cached_sock(Sock) ->
(catch gen_tcp:close(Sock)),
Krev = {socket_cache_rev, Sock},
case erlang:get(Krev) of
undefined ->
ok;
{Host, PortStr} ->
K = {socket_cache, Host, PortStr},
erlang:erase(Krev),
erlang:erase(K),
ok
end.
%%%%%%%%%%%%%%%%%
%%% basho_bench callbacks
-define(SEQ, ?MODULE).
-define(DEFAULT_HOSTIP_LIST, [{{127,0,0,1}, 7071}]).
-record(bb, {
host,
port_str,
%% sock,
proj_check_ticker_started=false,
proj_path,
proj,
chain_map
}).
new(1 = Id) ->
start_append_server(),
case basho_bench_config:get(file0_start_listener, no) of
no ->
ok;
{Port, DataDir} ->
start_listen_server(Port, DataDir)
end,
timer:sleep(100),
new_common(Id);
new(Id) ->
new_common(Id).
new_common(Id) ->
random:seed(now()),
ProjectionPathOrDir =
basho_bench_config:get(file0_projection_path, undefined),
Servers = basho_bench_config:get(file0_ip_list, ?DEFAULT_HOSTIP_LIST),
NumServers = length(Servers),
{Host, Port} = lists:nth((Id rem NumServers) + 1, Servers),
State0 = #bb{host=Host, port_str=integer_to_list(Port),
proj_path=ProjectionPathOrDir},
{ok, read_projection_info(State0)}.
run(null, _KeyGen, _ValueGen, State) ->
{ok, State};
run(keygen_valuegen_then_null, KeyGen, ValueGen, State) ->
_Prefix = KeyGen(),
_Value = ValueGen(),
{ok, State};
run(append_local_server, KeyGen, ValueGen, State) ->
Prefix = KeyGen(),
Value = ValueGen(),
{_, _} = ?SEQ:append(?SEQ, Prefix, Value),
{ok, State};
run(append_remote_server, KeyGen, ValueGen, State) ->
Prefix = KeyGen(),
Value = ValueGen(),
bb_do_write_chunk(Prefix, Value, State#bb.host, State#bb.port_str, State);
run(cc_append_remote_server, KeyGen, ValueGen, State0) ->
State = check_projection_check(State0),
Prefix = KeyGen(),
Value = ValueGen(),
{_Chain, ModHPs} = calc_chain(write, State#bb.proj, State#bb.chain_map,
Prefix),
FoldFun = fun({Host, PortStr}, Acc) ->
case bb_do_write_chunk(Prefix, Value, Host, PortStr,
State) of
{ok, _} ->
Acc + 1;
_ ->
Acc
end
end,
case lists:foldl(FoldFun, 0, ModHPs) of
N when is_integer(N), N > 0 ->
{ok, State};
0 ->
{error, oh_some_problem_yo, State}
end;
run(read_raw_line_local, KeyGen, _ValueGen, State) ->
{RawLine, Size, _File} = setup_read_raw_line(KeyGen),
bb_do_read_chunk(RawLine, Size, State#bb.host, State#bb.port_str, State);
run(cc_read_raw_line_local, KeyGen, _ValueGen, State0) ->
State = check_projection_check(State0),
{RawLine, Size, File} = setup_read_raw_line(KeyGen),
Prefix = re:replace(File, "\\..*", "", [{return, binary}]),
{_Chain, ModHPs} = calc_chain(read, State#bb.proj, State#bb.chain_map,
Prefix),
FoldFun = fun(_, {ok, _}=Acc) ->
Acc;
({Host, PortStr}, _Acc) ->
bb_do_read_chunk(RawLine, Size, Host, PortStr, State)
end,
lists:foldl(FoldFun, undefined, ModHPs).
bb_do_read_chunk(RawLine, Size, Host, PortStr, State) ->
try
Sock = get_cached_sock(Host, PortStr),
try
ok = gen_tcp:send(Sock, [RawLine, <<"\n">>]),
read_chunk(Sock, Size, State)
catch X2:Y2 ->
invalidate_cached_sock(Sock),
{error, {X2,Y2}, State}
end
catch X:Y ->
{error, {X,Y}, State}
end.
bb_do_write_chunk(Prefix, Value, Host, PortStr, State) ->
try
Sock = get_cached_sock(Host, PortStr),
try
{_, _, _} = upload_chunk_append(Sock, Prefix, Value),
{ok, State}
catch X2:Y2 ->
invalidate_cached_sock(Sock),
{error, {X2,Y2}, State}
end
catch X:Y ->
{error, {X,Y}, State}
end.
read_chunk(Sock, Size, State) ->
{ok, <<"OK\n">>} = gen_tcp:recv(Sock, 3),
{ok, _Chunk} = gen_tcp:recv(Sock, Size),
{ok, State}.
setup_read_raw_line(KeyGen) ->
RawLine = KeyGen(),
<<"R ", Rest/binary>> = RawLine,
{_Offset, Size, File} = read_hex_size(Rest),
{RawLine, Size, File}.
read_hex_size(Line) ->
<<OffsetHex:16/binary, " ", SizeHex:8/binary, " ", File/binary>> = Line,
<<Offset:64/big>> = hexstr_to_bin(OffsetHex),
<<Size:32/big>> = hexstr_to_bin(SizeHex),
{Offset, Size, File}.
read_projection_info(#bb{proj_path=undefined}=State) ->
State;
read_projection_info(#bb{proj_path=ProjectionPathOrDir}=State) ->
Proj = read_projection_file(ProjectionPathOrDir),
ChainMap = read_chain_map_file(ProjectionPathOrDir),
ModChainMap =
[{Chain, [{binary_to_list(Host), integer_to_list(Port)} ||
{Host, Port} <- Members]} ||
{Chain, Members} <- ChainMap],
State#bb{proj=Proj, chain_map=ModChainMap}.
check_projection_check(#bb{proj_check_ticker_started=false} = State) ->
timer:send_interval(5*1000 - random:uniform(500), projection_check),
check_projection_check(State#bb{proj_check_ticker_started=true});
check_projection_check(#bb{proj_check_ticker_started=true} = State) ->
receive
projection_check ->
read_projection_info(State)
after 0 ->
State
end.