diff --git a/prototype/demo-day-hack/README.md b/prototype/demo-day-hack/README.md new file mode 100644 index 0000000..57feaf5 --- /dev/null +++ b/prototype/demo-day-hack/README.md @@ -0,0 +1,233 @@ +## Basic glossary/summary/review for the impatient + +* **Machi**: An immutable, append-only distributed file storage service. + * For more detail, see: [https://github.com/basho/internal_wiki/wiki/RFCs#riak-cs-rfcs](https://github.com/basho/internal_wiki/wiki/RFCs#riak-cs-rfcs) +* **Machi cluster**: A small cluster of machines, e.g. 2 or 3, which form a single Machi cluster. + * All cluster data is replicated via Chain Replication. + * In nominal/full-repair state, all servers store exactly the same copies of all files. +* **Strong Consistency** and **CP-style operation**: Not here, please look elsewhere. This is an eventual consistency joint. +* **Sequencer**: Logical component that specifies where any new data will be stored. + * For each chunk of data written to a Machi server, the sequencer is solely responsible for assigning a file name and byte offset within that file. + * [http://martinfowler.com/bliki/TwoHardThings.html](http://martinfowler.com/bliki/TwoHardThings.html) +* **Server**: A mostly-dumb file server, with fewer total functions than an NFS version 2 file server. + * Compare with [https://tools.ietf.org/html/rfc1094#section-2.2](https://tools.ietf.org/html/rfc1094#section-2.2) +* **Chunk**: The basic storage unit of Machi: an ordered sequence of bytes. Each chunk is stored together with its MD5 checksum. + * Think of a single Erlang binary term... +* **File**: A strictly-ordered collection of chunks. +* **Chain Replication**: A variation of master/slave replication where writes and reads are strictly ordered in a "chain-like" manner +* **Chain Repair**: Identify chunks that are missing between servers in a chain and then fix them. +* **Cluster of clusters**: A desirable rainbow unicorn with purple fur and files automagically distributed across many Machi clusters, scalable to infinite size within a single data center. +* **Projection**: A data structure which specifies file distribution and file migration across a cluster of clusters. + * Also used by a single Machi cluster to determine the current membership & status of the cluster & its chain. +* **Erasure Coding**: A family of techniques of encoding data with redundant data. The redundant data is smaller than the original data, yet can be used to reconstruct the original data if some original parts are lost/destroyed. + +## Present in this POC + +* Basic sequencer and file server + * Sequencer and file server are combined for efficiency + * Basic ops: append chunk to file (with implicit sequencer assignment), read chunk, list files, fetch chunk checksums (by file) + * Chain replication ops: write chunk (location already determined by sequencer), delete file (cluster-of-clusters file migration use only), truncate file (cluster-of-clusters + erasure coding use only) + +* Basic file clients + * Write chunks to a single server (via sequencer) and read from a single server. + * Write and read chunks to multiple servers, using both projection and chain replication + * All chain replication and projection features are implemented on the client side. + +* Basic cluster-of-cluster functions + * Hibari style consistent hashing for file uploads & downloads + * There is no ring. + * Arbitrary weighted distribution of files across multiple clusters + * Append/write/read ops are automatically routed to the correct chain/cluster. + * File migration/rebalancing + +* Administrative features + * Verify chunk checksums in a file. + * Find and fix missing chunks between any two servers in a single cluster. + * Normal cluster operations are uninterrupted. + * Create new projection definitions + * Demonstrate file -> cluster mapping via projection + * Identify files in current projection that must be moved in the new projection + * ... and move them to the new projection + * Normal cluster operations are uninterrupted. + +* Erasure coding implementation + * Inefficient & just barely enough to work + * No NIF or Erlang port integration + * Reed-Solomon 10,4 encoding only + * 10 independent sub-chunks of original data + * 4 independent sub-chunks of parity data + * Use any 10 sub-chunks to reconstruct all original and parity data + * Encode an entire file and write sub-chunks to a secondary projection of 14 servers + * Download chunks automatically via regular projection (for replicated chunks) or a secondary projection (for EC-coded sub-chunks). + +## Missing from this POC + +* Chunk immutability is not enforced +* Read-repair ... no good reason, I just ran out of 2014 to implement it +* Per-chunk metadata +* Repair of erasure coded missing blocks: all major pieces are there, but I ran out of 2014 here, also. +* Automation: all chain management functions are manual, no health/status monitoring, etc. + +## Simple examples + +Run a server: + + ./file0_server.escript file0_server 7071 /tmp/path-to-data-dir + +Upload chunks of a local file in 1 million byte chunks: + + ./file0_write_client.escript localhost 7071 1000000 someprefix /path/to/some/local/file + +To fetch the chunks, we need a list of filenames, offsets, and chunk +sizes. The easiest way to get such a list is to save the output from +`./file0_write_client.escript` when uploading the file in the first +place. For example, save the following command's output into some +file e.g. `/tmp/input`. Then: + + ./file0_read_client.escript localhost 7071 /tmp/input + +## More details + +### Server protocol + +All negative server responses start with `ERROR` unless otherwise noted. + +* No distributed Erlang message passing ... "just ASCII on a socket". + +* `A LenHex Prefix` ... Append a chunk of data to a file with name prefix `Prefix`. File name & offset will be assigned by server-side sequencer. + * `OK OffsetHex Filename` + +* `R OffsetHex LenHex Filename` ... Read a chunk from `OffsetHex` for length `Lenhex` from file `FileName`. + * `OK [chunk follows]` + +* `L` ... List all files stored on the server, one per line + * `OK [list of files & file sizes, ending with .]` + +* `C Filename` ... Fetch chunk offset & checksums + * `OK [list of chunk offset, lengths, and checksums, ending with .]` + +* `QUIT` ... Close connection + * `OK` + +* `W-repl OffsetHex LenHex Filename` ... Write/replicate a chunk of data to a file with an `OffsetHex` that has already been assigned by a sequencer. + * `OK` + +* `TRUNC-hack--- Filename` ... Truncate a file (after erasure encoding is successful). + * `OK` + +### Start a server, upload & download chunks to a single server + +Mentioned above. + +### Upload chunks to a cluster (members specified manually) + + ./file0_1file_write_redundant.escript 1000000 otherprefix /path/to/local/file localhost 7071 localhost 7072 + +### List files (and file sizes) on a server + + ./file0_list.escript localhost 7071 + +### List all chunk sizes & checksums for a single file + + setenv M_FILE otherprefix.KCL2CC9.1 + ./file0_checksum_list.escript localhost 7071 $M_FILE + +### Verify checksums in a file + + ./file0_verify_checksums.escript localhost 7071 $M_FILE + +### Compare two servers and file all missing files + +TODO script is broken, but is just a proof-of-concept for early repair work. + + $ ./file0_compare_filelists.escript localhost 7071 localhost 7072 + {legend, {file, list_of_servers_without_file}}. + {all, [{"localhost","7071"},{"localhost","7072"}]}. + +### Repair server A -> server B, replicating all missing data + + ./file0_repair_server.escript localhost 7071 localhost 7072 verbose check + ./file0_repair_server.escript localhost 7071 localhost 7072 verbose repair + ./file0_repair_server.escript localhost 7071 localhost 7072 verbose repair + +And then repair in the reverse direction: + + ./file0_repair_server.escript localhost 7072 localhost 7071 verbose check + ./file0_repair_server.escript localhost 7072 localhost 7071 verbose repair + ./file0_repair_server.escript localhost 7072 localhost 7071 verbose repair + +### Projections: how to create & use them + +* Easiest use: all projection data is stored in a single directory; use path to the directory for all PoC scripts +* Files in the projection directory: + * `chains.map` + * Define all members of all chains at the current time + * `*.weight` + * Define chain capacity weights + * `*.proj` + * Define a projection (based on previous projection file + weight file) + +#### Sample chains.map file + + %% Chain names (key): please use binary + %% Chain members (value): two-tuples of binary hostname and integer TCP port + + {<<"chain1">>, [{<<"localhost">>, 7071}]}. + {<<"chain10">>, [{<<"localhost">>, 7071}, {<<"localhost">>, 7072}]}. + +#### Sample weight file + + %% Please use binaries for chain names + + [ + {<<"chain1">>, 1000} + ]. + +#### Sample projection file + + %% Created by: + % ./file0_cc_make_projection.escript new examples/weight_map1.weight examples/1.proj + + {projection,1,0,[{<<"chain1">>,1.0}],undefined,undefined,undefined,undefined}. + +### Create the `examples/1.proj` file + + ./file0_cc_make_projection.escript new examples/weight_map_ec1.weight examples/ec1.proj + +### Demo consistent hashing of file prefixes: what chain do we map to? + + ./file0_cc_map_prefix.escript examples/1.proj foo-prefix + ./file0_cc_map_prefix.escript examples/3.proj foo-prefix + ./file0_cc_map_prefix.escript examples/56.proj foo-prefix + +### Write replica chunks to a chain via projection + + ./file0_cc_1file_write_redundant.escript 4444 some-prefix /path/to/local/file ./examples/1.proj + +### Read replica chunks to a chain via projection + + ./file0_cc_read_client.escript examples/1.proj /tmp/input console + +### Change of projection: migrate 1 server's files from old to new projection + + ./file0_cc_migrate_files.escript examples/2.proj localhost 7071 verbose check + ./file0_cc_migrate_files.escript examples/2.proj localhost 7071 verbose repair delete-source + +### Erasure encode a file, then delete the original replicas, then fetch via EC sub-chunks + +The current prototype fetches the file directly from the file system, rather than fetching the file over the network via the `RegularProjection` projection definition. + +WARNING: For this part of demo to work correctly, `$M_PATH` must be a +file that exists on `localhost 7071` and not on `localhost 7072` or +any other server. + + setenv M_PATH /tmp/SAM1/seq-tests/data1/otherprefix.77Z34TQ.1 + ./file0_cc_ec_encode.escript examples/1.proj $M_PATH /tmp/zooenc examples/ec1.proj verbose check nodelete-tmp + + less $M_PATH + ./file0_cc_ec_encode.escript examples/1.proj $M_PATH /tmp/zooenc examples/ec1.proj verbose repair progress delete-source + less $M_PATH + ./file0_read_client.escript localhost 7071 /tmp/input + ./file0_read_client.escript localhost 7072 /tmp/input + ./file0_cc_read_client.escript examples/1.proj /tmp/input console examples/55.proj + diff --git a/prototype/demo-day-hack/REPAIR-SORT-JOIN.sh b/prototype/demo-day-hack/REPAIR-SORT-JOIN.sh new file mode 100644 index 0000000..02cfc04 --- /dev/null +++ b/prototype/demo-day-hack/REPAIR-SORT-JOIN.sh @@ -0,0 +1,26 @@ +#!/bin/sh + +trap "rm -f $1.tmp $2.tmp" 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 + +if [ $# -ne 6 ]; then + echo usage: $0 1. 2. 3. 4. 5. 6. + exit 1 +fi + +sort -u $1 > $1.tmp +sort -u $2 > $2.tmp +cmp -s $1.tmp $2.tmp +if [ $? -eq 0 ]; then + ## Output is identical. + touch $6 + exit 0 +fi + +# print only lines joinable on field 1 (offset) +join -1 1 -2 1 $1.tmp $2.tmp > $3 + +# print only lines field 1 (offset) present only in file 1 +join -v 1 -1 1 -2 1 $1.tmp $2.tmp > $4 + +# print only lines field 1 (offset) present only in file 2 +join -v 2 -1 1 -2 1 $1.tmp $2.tmp > $5 diff --git a/prototype/demo-day-hack/cc.hrl b/prototype/demo-day-hack/cc.hrl new file mode 100644 index 0000000..4b431b6 --- /dev/null +++ b/prototype/demo-day-hack/cc.hrl @@ -0,0 +1,33 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-record(projection, { + %% hard state + epoch :: non_neg_integer(), + last_epoch :: non_neg_integer(), + float_map, + last_float_map, + %% soft state + migrating :: boolean(), + tree, + last_tree + }). + +-define(SHA_MAX, (1 bsl (20*8))). diff --git a/prototype/demo-day-hack/examples/1.proj b/prototype/demo-day-hack/examples/1.proj new file mode 100644 index 0000000..be88732 --- /dev/null +++ b/prototype/demo-day-hack/examples/1.proj @@ -0,0 +1,4 @@ +%% Created by: +% ./file0_cc_make_projection.escript new examples/weight_map1.weight examples/1.proj + +{projection,1,0,[{<<"chain1">>,1.0}],undefined,undefined,undefined,undefined}. diff --git a/prototype/demo-day-hack/examples/2.proj b/prototype/demo-day-hack/examples/2.proj new file mode 100644 index 0000000..1a8900c --- /dev/null +++ b/prototype/demo-day-hack/examples/2.proj @@ -0,0 +1,7 @@ +% Created by: +% ./file0_cc_make_projection.escript examples/1.proj examples/weight_map2.weight examples/2.proj + +{projection,2,1, + [{<<"chain1">>,0.5},{<<"chain2">>,0.5}], + [{<<"chain1">>,1.0}], + undefined,undefined,undefined}. diff --git a/prototype/demo-day-hack/examples/3.proj b/prototype/demo-day-hack/examples/3.proj new file mode 100644 index 0000000..e2bbe65 --- /dev/null +++ b/prototype/demo-day-hack/examples/3.proj @@ -0,0 +1,10 @@ +% Created by: +% ./file0_cc_make_projection.escript examples/2.proj examples/weight_map3.weight examples/3.proj + +{projection,3,2, + [{<<"chain1">>,0.40816326530612246}, + {<<"chain3">>,0.09183673469387754}, + {<<"chain2">>,0.40816326530612246}, + {<<"chain3">>,0.09183673469387754}], + [{<<"chain1">>,0.5},{<<"chain2">>,0.5}], + undefined,undefined,undefined}. diff --git a/prototype/demo-day-hack/examples/55.proj b/prototype/demo-day-hack/examples/55.proj new file mode 100644 index 0000000..944d11f --- /dev/null +++ b/prototype/demo-day-hack/examples/55.proj @@ -0,0 +1 @@ +{projection,1,0,[{<<"chain10">>,1.0}],undefined,undefined,undefined,undefined}. diff --git a/prototype/demo-day-hack/examples/56.proj b/prototype/demo-day-hack/examples/56.proj new file mode 100644 index 0000000..b95933a --- /dev/null +++ b/prototype/demo-day-hack/examples/56.proj @@ -0,0 +1,5 @@ +{projection,2,1, + [{<<"chain10">>,0.9090909090909091}, + {<<"chain11">>,0.09090909090909094}], + [{<<"chain10">>,1.0}], + undefined,undefined,undefined}. diff --git a/prototype/demo-day-hack/examples/chains.map b/prototype/demo-day-hack/examples/chains.map new file mode 100644 index 0000000..c2349b3 --- /dev/null +++ b/prototype/demo-day-hack/examples/chains.map @@ -0,0 +1,27 @@ +%% Chain names (key): please use binary +%% Chain members (value): two-tuples of binary hostname and integer TCP port + +{<<"chain1">>, [{<<"localhost">>, 7071}]}. +{<<"chain2">>, [{<<"localhost">>, 7072}]}. +{<<"chain3">>, [{<<"localhost">>, 7073}]}. +{<<"chain4">>, [{<<"localhost">>, 7074}]}. +{<<"chain5">>, [{<<"localhost">>, 7075}]}. +{<<"chain6">>, [{<<"localhost">>, 7076}]}. + +{<<"chain10">>, [{<<"localhost">>, 7071}, {<<"localhost">>, 7072}]}. +{<<"chain11">>, [{<<"localhost">>, 7073}, {<<"localhost">>, 7074}]}. +{<<"chain12">>, [{<<"localhost">>, 7075}, {<<"localhost">>, 7076}]}. + +%% HACK ALERT +%% I'm being lazy here -- normally all members of a chain contain identical +%% data. In case of erasure coding, I'm using this hack to demonstrate +%% placement policy. The **caller** will interpret the chain membership +%% differently: for EC chains, the call assumes that each server in the +%% chain list will store one copy of one of the data/parity stripes. +{<<"ec-1-rs-10-4">>, [{<<"localhost">>, 7072}, {<<"localhost">>, 7072}, + {<<"localhost">>, 7072}, {<<"localhost">>, 7072}, + {<<"localhost">>, 7072}, {<<"localhost">>, 7072}, + {<<"localhost">>, 7072}, {<<"localhost">>, 7072}, + {<<"localhost">>, 7072}, {<<"localhost">>, 7072}, + {<<"localhost">>, 7072}, {<<"localhost">>, 7072}, + {<<"localhost">>, 7072}, {<<"localhost">>, 7072}]}. diff --git a/prototype/demo-day-hack/examples/ec1.proj b/prototype/demo-day-hack/examples/ec1.proj new file mode 100644 index 0000000..8f5540b --- /dev/null +++ b/prototype/demo-day-hack/examples/ec1.proj @@ -0,0 +1,6 @@ +%% Created by: +%% ./file0_cc_make_projection.escript new examples/weight_map_ec1.weight examples/ec1.proj + +{projection,1,0, + [{<<"ec-1-rs-10-4">>,1.0}], + undefined,undefined,undefined,undefined}. diff --git a/prototype/demo-day-hack/examples/servers.map b/prototype/demo-day-hack/examples/servers.map new file mode 100644 index 0000000..7ab0b3d --- /dev/null +++ b/prototype/demo-day-hack/examples/servers.map @@ -0,0 +1,9 @@ +%% Server names (key): please use {binary(), integer()} +%% Proplist (value): data_dir is mandatory + +{ {<<"localhost">>, 7071}, [{data_dir, "/tmp/file0/server1"}] }. +{ {<<"localhost">>, 7072}, [{data_dir, "/tmp/file0/server2"}] }. +{ {<<"localhost">>, 7073}, [{data_dir, "/tmp/file0/server3"}] }. +{ {<<"localhost">>, 7074}, [{data_dir, "/tmp/file0/server4"}] }. +{ {<<"localhost">>, 7075}, [{data_dir, "/tmp/file0/server5"}] }. +{ {<<"localhost">>, 7076}, [{data_dir, "/tmp/file0/server6"}] }. diff --git a/prototype/demo-day-hack/examples/weight_map1.weight b/prototype/demo-day-hack/examples/weight_map1.weight new file mode 100644 index 0000000..09eb812 --- /dev/null +++ b/prototype/demo-day-hack/examples/weight_map1.weight @@ -0,0 +1,5 @@ +%% Please use binaries for chain names + +[ + {<<"chain1">>, 1000} +]. diff --git a/prototype/demo-day-hack/examples/weight_map2.weight b/prototype/demo-day-hack/examples/weight_map2.weight new file mode 100644 index 0000000..28f8b32 --- /dev/null +++ b/prototype/demo-day-hack/examples/weight_map2.weight @@ -0,0 +1,6 @@ +%% Please use binaries for chain names + +[ + {<<"chain1">>, 1000}, + {<<"chain2">>, 1000} +]. diff --git a/prototype/demo-day-hack/examples/weight_map3.weight b/prototype/demo-day-hack/examples/weight_map3.weight new file mode 100644 index 0000000..7e8360d --- /dev/null +++ b/prototype/demo-day-hack/examples/weight_map3.weight @@ -0,0 +1,7 @@ +%% Please use binaries for chain names + +[ + {<<"chain1">>, 1000}, + {<<"chain2">>, 1000}, + {<<"chain3">>, 450} +]. diff --git a/prototype/demo-day-hack/examples/weight_map55.weight b/prototype/demo-day-hack/examples/weight_map55.weight new file mode 100644 index 0000000..f5ee40c --- /dev/null +++ b/prototype/demo-day-hack/examples/weight_map55.weight @@ -0,0 +1,5 @@ +%% Please use binaries for chain names + +[ + {<<"chain10">>, 1000} +]. diff --git a/prototype/demo-day-hack/examples/weight_map56.weight b/prototype/demo-day-hack/examples/weight_map56.weight new file mode 100644 index 0000000..0ef3c3e --- /dev/null +++ b/prototype/demo-day-hack/examples/weight_map56.weight @@ -0,0 +1,6 @@ +%% Please use binaries for chain names + +[ + {<<"chain10">>, 1000}, + {<<"chain11">>, 100} +]. diff --git a/prototype/demo-day-hack/examples/weight_map_ec1.weight b/prototype/demo-day-hack/examples/weight_map_ec1.weight new file mode 100644 index 0000000..1b74886 --- /dev/null +++ b/prototype/demo-day-hack/examples/weight_map_ec1.weight @@ -0,0 +1,5 @@ +%% Please use binaries for chain names + +[ + {<<"ec-1-rs-10-4">>, 1000} +]. diff --git a/prototype/demo-day-hack/file0.config b/prototype/demo-day-hack/file0.config index 8f121c9..5f6af21 100644 --- a/prototype/demo-day-hack/file0.config +++ b/prototype/demo-day-hack/file0.config @@ -1,30 +1,40 @@ -%% {mode, {rate,10}}. -{mode, max}. +{mode, {rate,50}}. +%% {mode, max}. {duration, 1}. +%{duration, 15}. {report_interval, 1}. -{concurrent, 46}. +%% {concurrent, 1}. +{concurrent, 32}. {code_paths, ["/tmp"]}. {driver, file0}. - -% {file0_start_listener, 7071}. - +{file0_ip_list, [{{127,0,0,1}, 7071}]}. +%%% {file0_start_listener, {7071, "/tmp/SAM1/seq-tests/data"}}. {value_generator_source_size, 16001001}. -{key_generator, <<"32k-archive">>}. +%{key_generator, <<"32k-archive">>}. +{key_generator, {to_binstr, "32k-prefix-~w", {uniform_int, 1000}}}. +{file0_projection_path, "/Users/fritchie/b/src/misc/sequencer/examples/1.proj"}. %% variations of put -{operations, [{keygen_valuegen_then_null, 1}]}. +%{operations, [{keygen_valuegen_then_null, 1}]}. {value_generator, {fixed_bin, 32768}}. %{operations, [{append_local_server, 1}]}. -{operations, [{append_remote_server, 1}]}. +%{operations, [{append_remote_server, 1}]}. +%{operations, [{cc_append_remote_server, 1}]}. +{operations, [{append_remote_server, 1}, {cc_append_remote_server, 1}]}. %{operations, [{append_local_server, 1}, {append_remote_server, 1}]}. %% variations of get -%% %% perl -e 'srand(time); $chunksize = 32768 ; foreach $file (@ARGV) { @fi = stat($file); $size = $fi[7]; $base = $file; $base =~ s|[^/]*/||g; for ($i = 0; $i < $size; $i += $chunksize) { printf "%d R %016x %08x %s\n", rand(999999), $i, $chunksize, $base; } }' data/* | sort -n | awk '{print $2, $3, $4, $5}' > /tmp/input.txt +%% perl -e 'srand(time); $chunksize = 32768 ; foreach $file (@ARGV) { @fi = stat($file); $size = $fi[7]; $base = $file; $base =~ s|[^/]*/||g; for ($i = 0; $i < $size - $chunksize; $i += $chunksize) { printf "%d R %016x %08x %s\n", rand(999999), $i, $chunksize, $base; } }' data/*.* | sort -n | awk '{print $2, $3, $4, $5}' > /tmp/input.txt %% {key_generator, {file_line_bin, "/tmp/input.txt"}}. %% {value_generator_source_size, 1}. %% {operations, [{read_raw_line_local, 1}]}. + +%% {key_generator, {file_line_bin, "/tmp/input.txt"}}. +%% {value_generator_source_size, 1}. +%% %{operations, [{cc_read_raw_line_local, 1}]}. +%% {operations, [{read_raw_line_local, 1}, {cc_read_raw_line_local, 1}]}. diff --git a/prototype/demo-day-hack/file0.erl b/prototype/demo-day-hack/file0.erl index 1ad6ac6..bcda2e1 100644 --- a/prototype/demo-day-hack/file0.erl +++ b/prototype/demo-day-hack/file0.erl @@ -24,25 +24,24 @@ -endif. %% -mode(compile). % for escript use +-include("cc.hrl"). -include_lib("kernel/include/file.hrl"). --define(MAX_FILE_SIZE, 256*1024*1024*1024). % 256 GBytes +-define(MAX_FILE_SIZE, 256*1024*1024). % 256 MBytes %% -define(DATA_DIR, "/Volumes/SAM1/seq-tests/data"). -define(DATA_DIR, "./data"). +-define(MINIMUM_OFFSET, 1024). append(Server, Prefix, Chunk) when is_binary(Prefix), is_binary(Chunk) -> - Server ! {seq_append, self(), Prefix, Chunk}, + CSum = checksum(Chunk), + Server ! {seq_append, self(), Prefix, Chunk, CSum}, receive - {assignment, File, Offset} -> - {File, Offset} + {assignment, Offset, File} -> + {Offset, File} after 10*1000 -> bummer end. -append_direct(Prefix, Chunk) when is_binary(Prefix), is_binary(Chunk) -> - RegName = make_regname(Prefix), - append(RegName, Prefix, Chunk). - start_append_server() -> start_append_server(?MODULE). @@ -70,8 +69,8 @@ run_listen_server(Port, DataDir) -> append_server_loop(DataDir) -> receive - {seq_append, From, Prefix, Chunk} -> - spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, + {seq_append, From, Prefix, Chunk, CSum} -> + spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum, DataDir) end), append_server_loop(DataDir) end. @@ -85,31 +84,37 @@ net_server_loop(Sock, DataDir) -> ok = inet:setopts(Sock, [{packet, line}]), case gen_tcp:recv(Sock, 0, 60*1000) of {ok, Line} -> - %% fmt("Got: ~p\n", [Line]), + %% verb("Got: ~p\n", [Line]), PrefixLenLF = byte_size(Line) - 2 - 8 - 1 - 1, PrefixLenCRLF = byte_size(Line) - 2 - 8 - 1 - 2, FileLenLF = byte_size(Line) - 2 - 16 - 1 - 8 - 1 - 1, FileLenCRLF = byte_size(Line) - 2 - 16 - 1 - 8 - 1 - 2, + CSumFileLenLF = byte_size(Line) - 2 - 1, + CSumFileLenCRLF = byte_size(Line) - 2 - 2, WriteFileLenLF = byte_size(Line) - 7 - 16 - 1 - 8 - 1 - 1, DelFileLenLF = byte_size(Line) - 14 - 1, case Line of %% For normal use - <<"A ", HexLen:8/binary, " ", + <<"A ", LenHex:8/binary, " ", Prefix:PrefixLenLF/binary, "\n">> -> - do_net_server_append(Sock, HexLen, Prefix); - <<"A ", HexLen:8/binary, " ", + do_net_server_append(Sock, LenHex, Prefix); + <<"A ", LenHex:8/binary, " ", Prefix:PrefixLenCRLF/binary, "\r\n">> -> - do_net_server_append(Sock, HexLen, Prefix); - <<"R ", HexOffset:16/binary, " ", HexLen:8/binary, " ", + do_net_server_append(Sock, LenHex, Prefix); + <<"R ", OffsetHex:16/binary, " ", LenHex:8/binary, " ", File:FileLenLF/binary, "\n">> -> - do_net_server_read(Sock, HexOffset, HexLen, File, DataDir); - <<"R ", HexOffset:16/binary, " ", HexLen:8/binary, " ", + do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir); + <<"R ", OffsetHex:16/binary, " ", LenHex:8/binary, " ", File:FileLenCRLF/binary, "\r\n">> -> - do_net_server_read(Sock, HexOffset, HexLen, File, DataDir); + do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir); <<"L\n">> -> do_net_server_listing(Sock, DataDir); <<"L\r\n">> -> do_net_server_listing(Sock, DataDir); + <<"C ", File:CSumFileLenLF/binary, "\n">> -> + do_net_server_checksum_listing(Sock, File, DataDir); + <<"C ", File:CSumFileLenCRLF/binary, "\n">> -> + do_net_server_checksum_listing(Sock, File, DataDir); <<"QUIT\n">> -> catch gen_tcp:close(Sock), exit(normal); @@ -117,15 +122,18 @@ net_server_loop(Sock, DataDir) -> catch gen_tcp:close(Sock), exit(normal); %% For "internal" replication only. - <<"W-repl ", HexOffset:16/binary, " ", HexLen:8/binary, " ", + <<"W-repl ", OffsetHex:16/binary, " ", LenHex:8/binary, " ", File:WriteFileLenLF/binary, "\n">> -> - do_net_server_write(Sock, HexOffset, HexLen, File, DataDir); + do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir); %% For data migration only. <<"DEL-migration ", File:DelFileLenLF/binary, "\n">> -> do_net_server_delete_migration_only(Sock, File, DataDir); + %% For erasure coding hackityhack + <<"TRUNC-hack--- ", File:DelFileLenLF/binary, "\n">> -> + do_net_server_truncate_hackityhack(Sock, File, DataDir); _ -> - fmt("Else Got: ~p\n", [Line]), - gen_tcp:send(Sock, "ERROR\n"), + verb("Else Got: ~p\n", [Line]), + gen_tcp:send(Sock, "ERROR SYNTAX\n"), catch gen_tcp:close(Sock), exit(normal) end, @@ -135,13 +143,18 @@ net_server_loop(Sock, DataDir) -> exit(normal) end. -do_net_server_append(Sock, HexLen, Prefix) -> - <> = hexstr_to_bin(HexLen), +do_net_server_append(Sock, LenHex, Prefix) -> + <> = hexstr_to_bin(LenHex), ok = inet:setopts(Sock, [{packet, raw}]), {ok, Chunk} = gen_tcp:recv(Sock, Len, 60*1000), - ?MODULE ! {seq_append, self(), Prefix, Chunk}, + CSum = checksum(Chunk), + try + ?MODULE ! {seq_append, self(), Prefix, Chunk, CSum} + catch error:badarg -> + error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]) + end, receive - {assignment, File, Offset} -> + {assignment, Offset, File} -> OffsetHex = bin_to_hexstr(<>), Out = io_lib:format("OK ~s ~s\n", [OffsetHex, File]), ok = gen_tcp:send(Sock, Out) @@ -149,24 +162,30 @@ do_net_server_append(Sock, HexLen, Prefix) -> ok = gen_tcp:send(Sock, "TIMEOUT\n") end. -do_net_server_read(Sock, HexOffset, HexLen, FileBin, DataDir) -> +do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir) -> DoItFun = fun(FH, Offset, Len) -> case file:pread(FH, Offset, Len) of {ok, Bytes} when byte_size(Bytes) == Len -> gen_tcp:send(Sock, ["OK\n", Bytes]); + {ok, Bytes} -> + verb("ok read but wanted ~p got ~p: ~p @ offset ~p\n", + [Len, size(Bytes), FileBin, Offset]), + ok = gen_tcp:send(Sock, "ERROR PARTIAL-READ\n"); + eof -> + perhaps_do_net_server_ec_read(Sock, FH); _Else2 -> - fmt("Else2 ~p ~p ~p\n", - [Offset, Len, _Else2]), - ok = gen_tcp:send(Sock, "ERROR\n") + verb("Else2 ~p ~p ~P\n", + [Offset, Len, _Else2, 20]), + ok = gen_tcp:send(Sock, "ERROR BAD-READ\n") end end, - do_net_server_readwrite_common(Sock, HexOffset, HexLen, FileBin, DataDir, + do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir, [read, binary, raw], DoItFun). -do_net_server_readwrite_common(Sock, HexOffset, HexLen, FileBin, DataDir, +do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir, FileOpts, DoItFun) -> - <> = hexstr_to_bin(HexOffset), - <> = hexstr_to_bin(HexLen), + <> = hexstr_to_bin(OffsetHex), + <> = hexstr_to_bin(LenHex), {_, Path} = make_data_filename(DataDir, FileBin), OptsHasWrite = lists:member(write, FileOpts), case file:open(Path, FileOpts) of @@ -179,30 +198,55 @@ do_net_server_readwrite_common(Sock, HexOffset, HexLen, FileBin, DataDir, {error, enoent} when OptsHasWrite -> ok = filelib:ensure_dir(Path), do_net_server_readwrite_common( - Sock, HexOffset, HexLen, FileBin, DataDir, + Sock, OffsetHex, LenHex, FileBin, DataDir, FileOpts, DoItFun); _Else -> - fmt("Else ~p ~p ~p ~p\n", [Offset, Len, Path, _Else]), - ok = gen_tcp:send(Sock, "ERROR\n") + %%%%%% keep?? verb("Else ~p ~p ~p ~p\n", [Offset, Len, Path, _Else]), + ok = gen_tcp:send(Sock, "ERROR BAD-IO\n") end. -do_net_server_write(Sock, HexOffset, HexLen, FileBin, DataDir) -> - DoItFun = fun(FH, Offset, Len) -> +do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir) -> + CSumPath = make_checksum_filename(DataDir, FileBin), + case file:open(CSumPath, [append, raw, binary, delayed_write]) of + {ok, FHc} -> + do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc); + {error, enoent} -> + ok = filelib:ensure_dir(CSumPath), + do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir) + end. + +do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc) -> + DoItFun = fun(FHd, Offset, Len) -> ok = inet:setopts(Sock, [{packet, raw}]), {ok, Chunk} = gen_tcp:recv(Sock, Len), - case file:pwrite(FH, Offset, Chunk) of + CSum = checksum(Chunk), + case file:pwrite(FHd, Offset, Chunk) of ok -> + CSumHex = bin_to_hexstr(CSum), + CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10], + ok = file:write(FHc, CSum_info), + ok = file:close(FHc), gen_tcp:send(Sock, <<"OK\n">>); - _Else2 -> - fmt("Else2 ~p ~p ~p\n", - [Offset, Len, _Else2]), - ok = gen_tcp:send(Sock, "ERROR\n") + _Else3 -> + verb("Else3 ~p ~p ~p\n", + [Offset, Len, _Else3]), + ok = gen_tcp:send(Sock, "ERROR BAD-PWRITE\n") end end, - do_net_server_readwrite_common(Sock, HexOffset, HexLen, FileBin, DataDir, + do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir, [write, read, binary, raw], DoItFun). +perhaps_do_net_server_ec_read(Sock, FH) -> + case file:pread(FH, 0, ?MINIMUM_OFFSET) of + {ok, Bin} when byte_size(Bin) == ?MINIMUM_OFFSET -> + decode_and_reply_net_server_ec_read(Sock, Bin); + {ok, _AnythingElse} -> + ok = gen_tcp:send(Sock, "ERROR PARTIAL-READ2\n"); + _AnythingElse -> + ok = gen_tcp:send(Sock, "ERROR BAD-PREAD\n") + end. + do_net_server_listing(Sock, DataDir) -> Files = filelib:wildcard("*", DataDir) -- ["config"], Out = ["OK\n", @@ -217,15 +261,77 @@ do_net_server_listing(Sock, DataDir) -> ], ok = gen_tcp:send(Sock, Out). +do_net_server_checksum_listing(Sock, File, DataDir) -> + CSumPath = make_checksum_filename(DataDir, File), + case file:open(CSumPath, [read, raw, binary]) of + {ok, FH} -> + {ok, FI} = file:read_file_info(CSumPath), + Len = FI#file_info.size, + LenHex = list_to_binary(bin_to_hexstr(<>)), + %% Client has option of line-by-line with "." terminator, + %% or using the offset in the OK message to slurp things + %% down by exact byte size. + ok = gen_tcp:send(Sock, [<<"OK ">>, LenHex, <<"\n">>]), + do_net_copy_bytes(FH, Sock), + ok = file:close(FH), + ok = gen_tcp:send(Sock, ".\n"); + {error, enoent} -> + ok = gen_tcp:send(Sock, "ERROR NO-SUCH-FILE\n"); + _ -> + ok = gen_tcp:send(Sock, "ERROR\n") + end. + +do_net_copy_bytes(FH, Sock) -> + case file:read(FH, 1024*1024) of + {ok, Bin} -> + ok = gen_tcp:send(Sock, Bin), + do_net_copy_bytes(FH, Sock); + eof -> + ok + end. + do_net_server_delete_migration_only(Sock, File, DataDir) -> {_, Path} = make_data_filename(DataDir, File), case file:delete(Path) of ok -> ok = gen_tcp:send(Sock, "OK\n"); + {error, enoent} -> + ok = gen_tcp:send(Sock, "ERROR NO-SUCH-FILE\n"); _ -> ok = gen_tcp:send(Sock, "ERROR\n") end. +do_net_server_truncate_hackityhack(Sock, File, DataDir) -> + {_, Path} = make_data_filename(DataDir, File), + case file:open(Path, [read, write, binary, raw]) of + {ok, FH} -> + try + {ok, ?MINIMUM_OFFSET} = file:position(FH, ?MINIMUM_OFFSET), + ok = file:truncate(FH), + ok = gen_tcp:send(Sock, "OK\n") + after + file:close(FH) + end; + {error, enoent} -> + ok = gen_tcp:send(Sock, "ERROR NO-SUCH-FILE\n"); + _ -> + ok = gen_tcp:send(Sock, "ERROR\n") + end. + +decode_and_reply_net_server_ec_read(Sock, <<"a ", Rest/binary>>) -> + decode_and_reply_net_server_ec_read_version_a(Sock, Rest); +decode_and_reply_net_server_ec_read(Sock, <<0:8, _/binary>>) -> + ok = gen_tcp:send(Sock, <<"ERROR NOT-ERASURE\n">>). + +decode_and_reply_net_server_ec_read_version_a(Sock, Rest) -> + %% <> = Rest, + HdrLen = 80 - 2 - 4 - 1, + <> = Rest, + <> = hexstr_to_bin(BodyLenHex), + <> = Rest2, + ok = gen_tcp:send(Sock, ["ERASURE ", BodyLenHex, " ", Hdr, Body]). + write_server_get_pid(Prefix, DataDir) -> RegName = make_regname(Prefix), case whereis(RegName) of @@ -237,10 +343,9 @@ write_server_get_pid(Prefix, DataDir) -> Pid end. -append_server_dispatch(From, Prefix, Chunk, DataDir) -> - %% _ = crypto:hash(md5, Chunk), +append_server_dispatch(From, Prefix, Chunk, CSum, DataDir) -> Pid = write_server_get_pid(Prefix, DataDir), - Pid ! {seq_append, From, Prefix, Chunk}, + Pid ! {seq_append, From, Prefix, Chunk, CSum}, exit(normal). start_seq_append_server(Prefix, DataDir) -> @@ -259,27 +364,44 @@ run_seq_append_server2(Prefix, DataDir) -> seq_append_server_loop(DataDir, Prefix, FileNum). seq_append_server_loop(DataDir, Prefix, FileNum) -> - {File, FullPath} = make_data_filename(DataDir, Prefix, FileNum), - {ok, FH} = file:open(FullPath, - [write, binary, raw]), - %% [write, binary, raw, delayed_write]), - seq_append_server_loop(DataDir, Prefix, File, FH, FileNum, 0). + SequencerNameHack = lists:flatten(io_lib:format( + "~.36B~.36B", + [element(3,now()), + list_to_integer(os:getpid())])), + {File, FullPath} = make_data_filename(DataDir, Prefix, SequencerNameHack, + FileNum), + {ok, FHd} = file:open(FullPath, + [write, binary, raw]), + %% [write, binary, raw, delayed_write]), + CSumPath = make_checksum_filename(DataDir, Prefix, SequencerNameHack, + FileNum), + {ok, FHc} = file:open(CSumPath, [append, raw, binary, delayed_write]), + seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}, FileNum, + ?MINIMUM_OFFSET). -seq_append_server_loop(DataDir, Prefix, _File, FH, FileNum, Offset) +seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, FileNum, Offset) when Offset > ?MAX_FILE_SIZE -> - ok = file:close(FH), + ok = file:close(FHd), + ok = file:close(FHc), info_msg("rollover: ~p server at file ~w offset ~w\n", [Prefix, FileNum, Offset]), run_seq_append_server2(Prefix, DataDir); -seq_append_server_loop(DataDir, Prefix, File, FH, FileNum, Offset) -> +seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) -> receive - {seq_append, From, Prefix, Chunk} -> - ok = file:pwrite(FH, Offset, Chunk), - From ! {assignment, File, Offset}, - Size = byte_size(Chunk), - seq_append_server_loop(DataDir, Prefix, File, FH, - FileNum, Offset + Size) + {seq_append, From, Prefix, Chunk, CSum} -> + ok = file:pwrite(FHd, Offset, Chunk), + From ! {assignment, Offset, File}, + Len = byte_size(Chunk), + OffsetHex = bin_to_hexstr(<>), + LenHex = bin_to_hexstr(<>), + CSumHex = bin_to_hexstr(CSum), + CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10], + ok = file:write(FHc, CSum_info), + seq_append_server_loop(DataDir, Prefix, File, FH_, + FileNum, Offset + Len) after 30*1000 -> + ok = file:close(FHd), + ok = file:close(FHc), info_msg("stop: ~p server at file ~w offset ~w\n", [Prefix, FileNum, Offset]), exit(normal) @@ -291,12 +413,20 @@ make_regname(Prefix) -> make_config_filename(DataDir, Prefix) -> lists:flatten(io_lib:format("~s/config/~s", [DataDir, Prefix])). +make_checksum_filename(DataDir, Prefix, SequencerName, FileNum) -> + lists:flatten(io_lib:format("~s/config/~s.~s.~w.csum", + [DataDir, Prefix, SequencerName, FileNum])). + +make_checksum_filename(DataDir, FileName) -> + lists:flatten(io_lib:format("~s/config/~s.csum", [DataDir, FileName])). + make_data_filename(DataDir, File) -> FullPath = lists:flatten(io_lib:format("~s/~s", [DataDir, File])), {File, FullPath}. -make_data_filename(DataDir, Prefix, FileNum) -> - File = erlang:iolist_to_binary(io_lib:format("~s.~w", [Prefix, FileNum])), +make_data_filename(DataDir, Prefix, SequencerName, FileNum) -> + File = erlang:iolist_to_binary(io_lib:format("~s.~s.~w", + [Prefix, SequencerName, FileNum])), FullPath = lists:flatten(io_lib:format("~s/~s", [DataDir, File])), {File, FullPath}. @@ -325,38 +455,158 @@ hexstr_to_bin([X,Y|T], Acc) -> {ok, [V], []} = io_lib:fread("~16u", [X,Y]), hexstr_to_bin(T, [V | Acc]). -bin_to_hexstr(Bin) -> - lists:flatten([io_lib:format("~2.16.0B", [X]) || - X <- binary_to_list(Bin)]). +bin_to_hexstr(<<>>) -> + []; +bin_to_hexstr(<>) -> + [hex_digit(X), hex_digit(Y)|bin_to_hexstr(Rest)]. + +hex_digit(X) when X < 10 -> + X + $0; +hex_digit(X) -> + X - 10 + $a. %%%%%%%%%%%%%%%%% %%% escript stuff -main2(["file-write-client", Host, PortStr, BlockSizeStr, PrefixStr, LocalFile]) -> +main2(["file-write-client"]) -> + io:format("Use: Write a local file to a single server.\n"), + io:format("Args: Host Port BlockSize Prefix LocalFile [OutputPath|'console']\n"), + erlang:halt(1); +main2(["file-write-client", Host, PortStr, BlockSizeStr, PrefixStr, LocalFile|Output]) -> Sock = escript_connect(Host, PortStr), BlockSize = list_to_integer(BlockSizeStr), Prefix = list_to_binary(PrefixStr), - escript_upload_file(Sock, BlockSize, Prefix, LocalFile); -main2(["1file-write-redundant-client", BlockSizeStr, PrefixStr, LocalFile|HPs]) -> + Res = escript_upload_file(Sock, BlockSize, Prefix, LocalFile), + FH = open_output_file(case Output of [] -> "console"; + _ -> Output end), + print_upload_details(FH, Res), + (catch file:close(FH)), + Res; + +main2(["1file-write-redundant-client"]) -> + io:format("Use: Write a local file to a series of servers.\n"), + io:format("Args: BlockSize Prefix LocalFilePath [silent] [Host Port [Host Port ...]]\n"), + erlang:halt(1); +main2(["1file-write-redundant-client", BlockSizeStr, PrefixStr, LocalFile|HPs0]) -> BlockSize = list_to_integer(BlockSizeStr), Prefix = list_to_binary(PrefixStr), - escript_upload_redundant(HPs, BlockSize, Prefix, LocalFile); + {Out, HPs} = case HPs0 of + ["silent"|Rest] -> {silent, Rest}; + _ -> {not_silent, HPs0} + end, + Res = escript_upload_redundant(HPs, BlockSize, Prefix, LocalFile), + if Out /= silent -> + print_upload_details(user, Res); + true -> + ok + end, + Res; + +main2(["chunk-read-client"]) -> + io:format("Use: Read a series of chunks for a single server.\n"), + io:format("Args: Host Port LocalChunkDescriptionPath [OutputPath|'console']\n"), + erlang:halt(1); main2(["chunk-read-client", Host, PortStr, ChunkFileList]) -> + main2(["chunk-read-client", Host, PortStr, ChunkFileList, "console"]); +main2(["chunk-read-client", Host, PortStr, ChunkFileList, OutputPath]) -> + FH = open_output_file(OutputPath), + OutFun = make_outfun(FH), + try + main2(["chunk-read-client2", Host, PortStr, ChunkFileList, OutFun]) + after + (catch file:close(FH)) + end; +main2(["chunk-read-client2", Host, PortStr, ChunkFileList, ProcFun]) -> Sock = escript_connect(Host, PortStr), - escript_download_chunks(Sock, ChunkFileList); + escript_download_chunks(Sock, ChunkFileList, ProcFun); + +main2(["list-client"]) -> + io:format("Use: List all files on a single server.\n"), + io:format("Args: Host Port [OutputPath]\n"), + erlang:halt(1); main2(["list-client", Host, PortStr]) -> Sock = escript_connect(Host, PortStr), escript_list(Sock); +main2(["list-client", Host, PortStr, OutputPath]) -> + Sock = escript_connect(Host, PortStr), + escript_list(Sock, OutputPath); + +main2(["checksum-list-client"]) -> + io:format("Use: List all chunk sizes & checksums for a single file.\n"), + io:format("Args: [ 'line-by-line' ] Host Port File\n"), + erlang:halt(1); +main2(["checksum-list-client-line-by-line", Host, PortStr, File]) -> + Sock = escript_connect(Host, PortStr), + ProcFun = make_outfun(user), + escript_checksum_list(Sock, File, line_by_line, ProcFun); +main2(["checksum-list-client", Host, PortStr, File]) -> + Sock = escript_connect(Host, PortStr), + ProcFun = make_outfun(user), + escript_checksum_list(Sock, File, fast, ProcFun); + +main2(["delete-client"]) -> + io:format("Use: Delete a file (NOT FOR GENERAL USE)\n"), + io:format("Args: Host Port File\n"), + erlang:halt(1); main2(["delete-client", Host, PortStr, File]) -> Sock = escript_connect(Host, PortStr), escript_delete(Sock, File); + +main2(["server"]) -> + io:format("Use: Run a file server + TCP listen port service.\n"), + io:format("Args: Port DataDirPath\n"), + erlang:halt(1); main2(["server", RegNameStr, PortStr, DataDir]) -> Port = list_to_integer(PortStr), %% application:start(sasl), _Pid1 = start_listen_server(Port, DataDir), _Pid2 = start_append_server(list_to_atom(RegNameStr), DataDir), - receive forever -> ok end. + receive forever -> ok end; + +%%%% cc flavors %%%% + +main2(["cc-1file-write-redundant-client"]) -> + io:format("Use: Write a local file to a chain via projection.\n"), + io:format("Args: BlockSize Prefix LocalFilePath ProjectionPath\n"), + erlang:halt(1); +main2(["cc-1file-write-redundant-client", BlockSizeStr, PrefixStr, LocalFile, ProjectionPath]) -> + BlockSize = list_to_integer(BlockSizeStr), + Prefix = list_to_binary(PrefixStr), + {_Chain, RawHPs} = calc_chain(write, ProjectionPath, PrefixStr), + HPs = convert_raw_hps(RawHPs), + Res = escript_upload_redundant(HPs, BlockSize, Prefix, LocalFile), + print_upload_details(user, Res), + Res; + +main2(["cc-chunk-read-client"]) -> + io:format("Use: Read a series of chunks from a chain via projection.\n"), + io:format("Args: ProjectionPath ChunkFileList [OutputPath|'console' \\\n\t[ErrorCorrection_ProjectionPath]]\n"), + erlang:halt(1); +main2(["cc-chunk-read-client", ProjectionPathOrDir, ChunkFileList]) -> + main3(["cc-chunk-read-client", ProjectionPathOrDir, ChunkFileList,"console", + undefined]); +main2(["cc-chunk-read-client", ProjectionPathOrDir, ChunkFileList, OutputPath]) -> + main3(["cc-chunk-read-client", ProjectionPathOrDir, ChunkFileList, OutputPath, + undefined]); +main2(["cc-chunk-read-client", ProjectionPathOrDir, ChunkFileList, OutputPath, + EC_ProjectionPath]) -> + main3(["cc-chunk-read-client", ProjectionPathOrDir, ChunkFileList, OutputPath, + EC_ProjectionPath]). + +main3(["cc-chunk-read-client", + ProjectionPathOrDir, ChunkFileList, OutputPath, EC_ProjectionPath]) -> + P = read_projection_file(ProjectionPathOrDir), + ChainMap = read_chain_map_file(ProjectionPathOrDir), + FH = open_output_file(OutputPath), + ProcFun = make_outfun(FH), + Res = try + escript_cc_download_chunks(ChunkFileList, P, ChainMap, ProcFun, + EC_ProjectionPath) + after + (catch file:close(FH)) + end, + Res. escript_connect(Host, PortStr) -> Port = list_to_integer(PortStr), @@ -373,40 +623,41 @@ escript_upload_file(Sock, BlockSize, Prefix, File) -> file:close(FH) end. -escript_upload_file2({ok, Bin}, FH, BlockSize, Prefix, Sock, Acc) -> - {OffsetHex, SizeHex, File} = upload_chunk_append(Sock, Prefix, Bin), - fmt("~s ~s ~s\n", [OffsetHex, SizeHex, File]), +escript_upload_file2({ok, Chunk}, FH, BlockSize, Prefix, Sock, Acc) -> + {OffsetHex, LenHex, File} = upload_chunk_append(Sock, Prefix, Chunk), + verb("~s ~s ~s\n", [OffsetHex, LenHex, File]), <> = hexstr_to_bin(OffsetHex), - <> = hexstr_to_bin(SizeHex), + <> = hexstr_to_bin(LenHex), OSF = {Offset, Size, File}, escript_upload_file2(file:read(FH, BlockSize), FH, BlockSize, Prefix, Sock, [OSF|Acc]); escript_upload_file2(eof, _FH, _BlockSize, _Prefix, _Sock, Acc) -> lists:reverse(Acc). -upload_chunk_append(Sock, Prefix, Bin) -> - %% _ = crypto:hash(md5, Bin), - Size = byte_size(Bin), - SizeHex = list_to_binary(bin_to_hexstr(<>)), - Cmd = <<"A ", SizeHex/binary, " ", Prefix/binary, "\n">>, - ok = gen_tcp:send(Sock, [Cmd, Bin]), +upload_chunk_append(Sock, Prefix, Chunk) -> + %% _ = crypto:hash(md5, Chunk), + Len = byte_size(Chunk), + LenHex = list_to_binary(bin_to_hexstr(<>)), + Cmd = <<"A ", LenHex/binary, " ", Prefix/binary, "\n">>, + ok = gen_tcp:send(Sock, [Cmd, Chunk]), {ok, Line} = gen_tcp:recv(Sock, 0), PathLen = byte_size(Line) - 3 - 16 - 1 - 1, <<"OK ", OffsetHex:16/binary, " ", Path:PathLen/binary, _:1/binary>> = Line, - %% <> = hexstr_to_bin(OffsetHex), - {OffsetHex, SizeHex, Path}. + {OffsetHex, LenHex, Path}. -upload_chunk_write(Sock, Offset, File, Bin) -> - %% _ = crypto:hash(md5, Bin), - Size = byte_size(Bin), +upload_chunk_write(Sock, Offset, File, Chunk) when is_integer(Offset) -> OffsetHex = list_to_binary(bin_to_hexstr(<>)), - SizeHex = list_to_binary(bin_to_hexstr(<>)), + upload_chunk_write(Sock, OffsetHex, File, Chunk); +upload_chunk_write(Sock, OffsetHex, File, Chunk) when is_binary(OffsetHex) -> + %% _ = crypto:hash(md5, Chunk), + Len = byte_size(Chunk), + LenHex = list_to_binary(bin_to_hexstr(<>)), Cmd = <<"W-repl ", OffsetHex/binary, " ", - SizeHex/binary, " ", File/binary, "\n">>, - ok = gen_tcp:send(Sock, [Cmd, Bin]), + LenHex/binary, " ", File/binary, "\n">>, + ok = gen_tcp:send(Sock, [Cmd, Chunk]), {ok, Line} = gen_tcp:recv(Sock, 0), <<"OK\n">> = Line, - {OffsetHex, SizeHex, File}. + {OffsetHex, LenHex, File}. escript_upload_redundant([Host, PortStr|HPs], BlockSize, Prefix, LocalFile) -> Sock = escript_connect(Host, PortStr), @@ -427,7 +678,7 @@ escript_upload_redundant2([Host, PortStr|HPs], OSFs, LocalFile, OSFs) -> [begin {ok, Chunk} = file:read(FH, Size), _OSF2 = upload_chunk_write(Sock, Offset, File, Chunk) - %% fmt("~p: ~p\n", [{Host, PortStr}, OSF2]) + %% verb("~p: ~p\n", [{Host, PortStr}, OSF2]) end || {Offset, Size, File} <- OSFs] after gen_tcp:close(Sock), @@ -435,55 +686,303 @@ escript_upload_redundant2([Host, PortStr|HPs], OSFs, LocalFile, OSFs) -> end, escript_upload_redundant2(HPs, OSFs, LocalFile, OSFs). -escript_download_chunks(Sock, ChunkFileList) -> +escript_download_chunks(Sock, {{{ChunkLine}}}, ProcFun) -> + escript_download_chunk({ok, ChunkLine}, invalid_fd, Sock, ProcFun); +escript_download_chunks(Sock, ChunkFileList, ProcFun) -> {ok, FH} = file:open(ChunkFileList, [read, raw, binary]), - escript_dowload_chunk(file:read_line(FH), FH, Sock). + escript_download_chunk(file:read_line(FH), FH, Sock, ProcFun). -escript_dowload_chunk({ok, Line}, FH, Sock) -> +escript_download_chunk({ok, Line}, FH, Sock, ProcFun) -> + ChunkOrError = escript_cc_download_chunk2(Sock, Line), + ProcFun(ChunkOrError), + [ChunkOrError| + escript_download_chunk((catch file:read_line(FH)), FH, Sock, ProcFun)]; +escript_download_chunk(eof, _FH, _Sock, ProcFun) -> + ProcFun(eof), + []; +escript_download_chunk(_Else, _FH, _Sock, ProcFun) -> + ProcFun(eof), + []. + +escript_cc_download_chunks({{{ChunkLine}}}, P, ChainMap, ProcFun, + EC_ProjectionPath) -> + escript_cc_download_chunk({ok,ChunkLine}, invalid_fd, P, ChainMap, ProcFun, + EC_ProjectionPath); +escript_cc_download_chunks(ChunkFileList, P, ChainMap, ProcFun, + EC_ProjectionPath) -> + {ok, FH} = file:open(ChunkFileList, [read, raw, binary]), + escript_cc_download_chunk(file:read_line(FH), FH, P, ChainMap, ProcFun, + EC_ProjectionPath). + +escript_cc_download_chunk({ok, Line}, FH, P, ChainMap, ProcFun, + EC_ProjectionPath) -> + RestLen = byte_size(Line) - 16 - 1 - 8 - 1 - 1, + <<_Offset:16/binary, " ", _Len:8/binary, " ", Rest:RestLen/binary, "\n">> + = Line, + Prefix = re:replace(Rest, "\\..*", "", [{return, binary}]), + {_Chains, RawHPs} = calc_chain(read, P, ChainMap, Prefix), + Chunk = lists:foldl( + fun(_RawHP, Bin) when is_binary(Bin) -> Bin; + (RawHP, _) -> + [Host, PortStr] = convert_raw_hps([RawHP]), + Sock = get_cached_sock(Host, PortStr), + case escript_cc_download_chunk2(Sock, Line) of + Bin when is_binary(Bin) -> + Bin; + {error, _} = Error -> + Error; + {erasure_encoded, _} = EC_info -> + escript_cc_download_ec_chunk(EC_info, + EC_ProjectionPath) + end + end, undefined, RawHPs), + ProcFun(Chunk), + [Chunk|escript_cc_download_chunk((catch file:read_line(FH)), + FH, P, ChainMap, ProcFun, + EC_ProjectionPath)]; +escript_cc_download_chunk(eof, _FH, _P, _ChainMap, ProcFun, + _EC_ProjectionPath) -> + ProcFun(eof), + []; +escript_cc_download_chunk(Else, _FH, _P, _ChainMap, ProcFun, + _EC_ProjectionPath) -> + ProcFun(Else), + []. + +escript_cc_download_chunk2(Sock, Line) -> %% Line includes an LF, so we can be lazy. - CmdLF = ["R ", Line], + CmdLF = [<<"R ">>, Line], ok = gen_tcp:send(Sock, CmdLF), - {ok, <<"OK\n">>} = gen_tcp:recv(Sock, 3), - Size = read_hex_size(Line), - {ok, _Chunk} = gen_tcp:recv(Sock, Size), - fmt("ok\n"), - escript_dowload_chunk(file:read_line(FH), FH, Sock); -escript_dowload_chunk(eof, _FH, _Sock) -> - ok. + case gen_tcp:recv(Sock, 3) of + {ok, <<"OK\n">>} -> + {_Offset, Size, _File} = read_hex_size(Line), + {ok, Chunk} = gen_tcp:recv(Sock, Size), + Chunk; + {ok, Else} -> + {ok, OldOpts} = inet:getopts(Sock, [packet]), + ok = inet:setopts(Sock, [{packet, line}]), + {ok, Else2} = gen_tcp:recv(Sock, 0), + ok = inet:setopts(Sock, OldOpts), + case Else of + <<"ERA">> -> + escript_cc_parse_ec_info(Sock, Line, Else2); + _ -> + {error, {Line, <>}} + end + end. + +escript_cc_parse_ec_info(Sock, Line, Else2) -> + ChompLine = chomp(Line), + {Offset, Size, File} = read_hex_size(ChompLine), + <<"SURE ", BodyLenHex:4/binary, " ", StripeWidthHex:16/binary, " ", + OrigFileLenHex:16/binary, " rs_10_4_v1", _/binary>> = Else2, + <> = hexstr_to_bin(BodyLenHex), + {ok, SummaryBody} = gen_tcp:recv(Sock, BodyLen), + + <> = hexstr_to_bin(StripeWidthHex), + <> = hexstr_to_bin(OrigFileLenHex), + NewFileNum = (Offset div StripeWidth) + 1, + NewOffset = Offset rem StripeWidth, + if Offset + Size > OrigFileLen -> + %% Client's request is larger than original file size, derp + {error, bad_offset_and_size}; + NewOffset + Size > StripeWidth -> + %% Client's request straddles a stripe boundary, TODO fix me + {error, todo_TODO_implement_this_with_two_reads_and_then_glue_together}; + true -> + NewOffsetHex = bin_to_hexstr(<>), + LenHex = bin_to_hexstr(<>), + NewSuffix = file_suffix_rs_10_4_v1(NewFileNum), + NewFile = iolist_to_binary([File, NewSuffix]), + NewLine = iolist_to_binary([NewOffsetHex, " ", LenHex, " ", + NewFile, "\n"]), + {erasure_encoded, {Offset, Size, File, NewOffset, NewFile, + NewFileNum, NewLine, SummaryBody}} + end. + +%% TODO: The EC method/version/type stuff here is loosey-goosey +escript_cc_download_ec_chunk(EC_info, undefined) -> + EC_info; +escript_cc_download_ec_chunk({erasure_encoded, + {_Offset, _Size, _File, _NewOffset, NewFile, + NewFileNum, NewLine, SummaryBody}}, + EC_ProjectionPath) -> + {P, ChainMap} = get_cached_projection(EC_ProjectionPath), + %% Remember: we use the whole file name for hashing, not the prefix + {_Chains, RawHPs} = calc_chain(read, P, ChainMap, NewFile), + RawHP = lists:nth(NewFileNum, RawHPs), + [Host, PortStr] = convert_raw_hps([RawHP]), + Sock = get_cached_sock(Host, PortStr), + case escript_cc_download_chunk2(Sock, NewLine) of + Chunk when is_binary(Chunk) -> + Chunk; + {error, _} = Else -> + io:format("TODO: EC chunk get failed:\n\t~s\n", [NewLine]), + io:format("Use this info to reconstruct:\n\t~p\n\n", [SummaryBody]), + Else + end. + +get_cached_projection(EC_ProjectionPath) -> + case get(cached_projection) of + undefined -> + P = read_projection_file(EC_ProjectionPath), + ChainMap = read_chain_map_file(EC_ProjectionPath), + put(cached_projection, {P, ChainMap}), + get_cached_projection(EC_ProjectionPath); + Stuff -> + Stuff + end. + +file_suffix_rs_10_4_v1(1) -> <<"_k01">>; +file_suffix_rs_10_4_v1(2) -> <<"_k02">>; +file_suffix_rs_10_4_v1(3) -> <<"_k03">>; +file_suffix_rs_10_4_v1(4) -> <<"_k04">>; +file_suffix_rs_10_4_v1(5) -> <<"_k05">>; +file_suffix_rs_10_4_v1(6) -> <<"_k06">>; +file_suffix_rs_10_4_v1(7) -> <<"_k07">>; +file_suffix_rs_10_4_v1(8) -> <<"_k08">>; +file_suffix_rs_10_4_v1(9) -> <<"_k09">>; +file_suffix_rs_10_4_v1(10) -> <<"_k10">>. escript_list(Sock) -> + escript_list2(Sock, make_outfun(user)). + +escript_list(Sock, OutputPath) -> +io:format("OutputPath ~p\n", [OutputPath]), + {ok, FH} = file:open(OutputPath, [write]), + try + escript_list2(Sock, make_outfun(FH)) + after + file:close(FH) + end. + +escript_list2(Sock, ProcFun) -> ok = gen_tcp:send(Sock, <<"L\n">>), ok = inet:setopts(Sock, [{packet, line}]), {ok, <<"OK\n">>} = gen_tcp:recv(Sock, 0), - Res = escript_list(gen_tcp:recv(Sock, 0), Sock), + Res = escript_list2(gen_tcp:recv(Sock, 0), Sock, ProcFun), ok = inet:setopts(Sock, [{packet, raw}]), Res. -escript_list({ok, <<".\n">>}, _Sock) -> +escript_list2({ok, <<".\n">>}, _Sock, ProcFun) -> + ProcFun(eof), []; -escript_list({ok, Line}, Sock) -> - fmt("~s", [Line]), - [Line|escript_list(gen_tcp:recv(Sock, 0), Sock)]; -escript_list(Else, _Sock) -> - fmt("ERROR: ~p\n", [Else]), +escript_list2({ok, Line}, Sock, ProcFun) -> + ProcFun(Line), + [Line|escript_list2(gen_tcp:recv(Sock, 0), Sock, ProcFun)]; +escript_list2(Else, _Sock, ProcFun) -> + ProcFun(io_lib:format("ERROR: ~p\n", [Else])), {error, Else}. +escript_checksum_list(Sock, File, Method, ProcFun) -> + ok = gen_tcp:send(Sock, [<<"C ">>, File, <<"\n">>]), + ok = inet:setopts(Sock, [{packet, line}]), + case gen_tcp:recv(Sock, 0) of + {ok, <<"OK ", Rest/binary>> = Line} -> + put(status, ok), % may be unset later + ProcFun(Line), + if Method == line_by_line -> + escript_checksum_list_line_by_line(Sock, ProcFun); + Method == fast -> + RestLen = byte_size(Rest) - 1, + <> = Rest, + <> = hexstr_to_bin(LenHex), + ok = inet:setopts(Sock, [{packet, raw}]), + escript_checksum_list_fast(Sock, Len, ProcFun) + end; + {ok, _BAH} -> + put(status, error), + io:format("ERROR: ~p\n", [_BAH]) + end. + +escript_checksum_list_line_by_line(Sock, ProcFun) -> + case gen_tcp:recv(Sock, 0) of + {ok, <<".\n">> = Line} -> + ProcFun(Line), + ok; + {ok, Line} -> + ProcFun(Line), + escript_checksum_list_line_by_line(Sock, ProcFun) + end. + +escript_checksum_list_fast(Sock, 0, ProcFun) -> + {ok, <<".\n">> = Line} = gen_tcp:recv(Sock, 2), + ProcFun(Line), + ok; +escript_checksum_list_fast(Sock, Remaining, ProcFun) -> + Num = erlang:min(Remaining, 1024*1024), + {ok, Bytes} = gen_tcp:recv(Sock, Num), + ProcFun(Bytes), + escript_checksum_list_fast(Sock, Remaining - byte_size(Bytes), ProcFun). + escript_delete(Sock, File) -> ok = gen_tcp:send(Sock, [<<"DEL-migration ">>, File, <<"\n">>]), ok = inet:setopts(Sock, [{packet, line}]), case gen_tcp:recv(Sock, 0) of {ok, <<"OK\n">>} -> ok; - {ok, <<"ERROR\n">>} -> + {ok, <<"ERROR", _/binary>>} -> error end. -fmt(Fmt) -> - fmt(Fmt, []). +escript_compare_servers(Sock1, Sock2, H1, H2, Args) -> + FileFilterFun = fun(_) -> true end, + escript_compare_servers(Sock1, Sock2, H1, H2, FileFilterFun, Args). -fmt(Fmt, Args) -> - case application:get_env(kernel, verbose) of {ok, false} -> ok; - _ -> io:format(Fmt, Args) +escript_compare_servers(Sock1, Sock2, H1, H2, FileFilterFun, Args) -> + All = [H1, H2], + put(mydict, dict:new()), + Fetch1 = make_fetcher(H1), + Fetch2 = make_fetcher(H2), + + Fmt = case Args of + [] -> + fun(eof) -> ok; (Str) -> io:format(user, Str, []) end; + [null] -> + fun(_) -> ok end; + [OutFile] -> + {ok, FH} = file:open(OutFile, [write]), + fun(eof) -> file:close(FH); + (Str) -> file:write(FH, Str) + end + end, + + _X1 = escript_list2(Sock1, Fetch1), + _X2 = escript_list2(Sock2, Fetch2), + FoldRes = lists:sort(dict:to_list(get(mydict))), + Fmt("{legend, {file, list_of_servers_without_file}}.\n"), + Fmt(io_lib:format("{all, ~p}.\n", [All])), + Res = [begin + {GotIt, Sizes} = lists:unzip(GotSizes), + Size = lists:max(Sizes), + Missing = {File, {Size, All -- GotIt}}, + verb("~p.\n", [Missing]), + Missing + end || {File, GotSizes} <- FoldRes, FileFilterFun(File)], + (catch Fmt(eof)), + Res. + +make_fetcher(Host) -> + fun(eof) -> + ok; + (<>) -> + <> = hexstr_to_bin(SizeHex), + FileLen = byte_size(Rest) - 1, + <> = Rest, + NewDict = dict:append(File, {Host, Size}, get(mydict)), + put(mydict, NewDict) + end. + +checksum(Bin) when is_binary(Bin) -> + crypto:hash(md5, Bin). + +verb(Fmt) -> + verb(Fmt, []). + +verb(Fmt, Args) -> + case application:get_env(kernel, verbose) of + {ok, true} -> io:format(Fmt, Args); + _ -> ok end. info_msg(Fmt, Args) -> @@ -491,6 +990,400 @@ info_msg(Fmt, Args) -> _ -> error_logger:info_msg(Fmt, Args) end. +repair(File, Size, [], Mode, V, SrcS, SrcS2, DstS, DstS2, _Src) -> + verb("~s: present on both: ", [File]), + repair_both_present(File, Size, Mode, V, SrcS, SrcS2, DstS, DstS2); +repair(File, Size, MissingList, Mode, V, SrcS, SrcS2, DstS, _DstS2, Src) -> + case lists:member(Src, MissingList) of + true -> + verb("~s -> ~p, skipping: not on source server\n", [File, MissingList]); + false when Mode == check -> + verb("~s -> ~p, copy ~s MB (skipped)\n", [File, MissingList, mbytes(Size)]); + false -> + verb("~s -> ~p, copy ~s MB ", [File, MissingList, mbytes(Size)]), + ok = copy_file(File, SrcS, SrcS2, DstS, V), + verb("done\n", []) + end. + +copy_file(File, SrcS, SrcS2, DstS, Verbose) -> + %% Use the *second* source socket to copy each chunk. + ProcChecksum = copy_file_proc_checksum_fun(File, SrcS2, DstS, Verbose), + %% Use the *first source socket to enumerate the chunks & checksums. + escript_checksum_list(SrcS, File, line_by_line, ProcChecksum). + +copy_file_proc_checksum_fun(File, SrcS, DstS, _Verbose) -> + fun(<>) -> + <> = hexstr_to_bin(LenHex), + DownloadChunkBin = <>, + [Chunk] = escript_download_chunks(SrcS, {{{DownloadChunkBin}}}, + fun(_) -> ok end), + CSum = hexstr_to_bin(CSumHex), + CSum2 = checksum(Chunk), + if Len == byte_size(Chunk), CSum == CSum2 -> + {_,_,_} = upload_chunk_write(DstS, OffsetHex, File, Chunk), + ok; + true -> + io:format("ERROR: ~s ~s ~s csum/size error\n", + [File, OffsetHex, LenHex]), + error + end; + (_Else) -> + ok + end. + +repair_both_present(File, Size, Mode, V, SrcS, _SrcS2, DstS, _DstS2) -> + Tmp1 = lists:flatten(io_lib:format("/tmp/sort.1.~w.~w.~w", tuple_to_list(now()))), + Tmp2 = lists:flatten(io_lib:format("/tmp/sort.2.~w.~w.~w", tuple_to_list(now()))), + J_Both = lists:flatten(io_lib:format("/tmp/join.3-both.~w.~w.~w", tuple_to_list(now()))), + J_SrcOnly = lists:flatten(io_lib:format("/tmp/join.4-src-only.~w.~w.~w", tuple_to_list(now()))), + J_DstOnly = lists:flatten(io_lib:format("/tmp/join.5-dst-only.~w.~w.~w", tuple_to_list(now()))), + S_Identical = lists:flatten(io_lib:format("/tmp/join.6-sort-identical.~w.~w.~w", tuple_to_list(now()))), + {ok, FH1} = file:open(Tmp1, [write, raw, binary]), + {ok, FH2} = file:open(Tmp2, [write, raw, binary]), + try + K = md5_ctx, + MD5_it = fun(Bin) -> + {FH, MD5ctx1} = get(K), + file:write(FH, Bin), + MD5ctx2 = crypto:hash_update(MD5ctx1, Bin), + put(K, {FH, MD5ctx2}) + end, + put(K, {FH1, crypto:hash_init(md5)}), + ok = escript_checksum_list(SrcS, File, fast, MD5_it), + {_, MD5_1} = get(K), + SrcMD5 = crypto:hash_final(MD5_1), + put(K, {FH2, crypto:hash_init(md5)}), + ok = escript_checksum_list(DstS, File, fast, MD5_it), + {_, MD5_2} = get(K), + DstMD5 = crypto:hash_final(MD5_2), + if SrcMD5 == DstMD5 -> + verb("identical\n", []); + true -> + ok = file:close(FH1), + ok = file:close(FH2), + _Q1 = os:cmd("./REPAIR-SORT-JOIN.sh " ++ Tmp1 ++ " " ++ Tmp2 ++ " " ++ J_Both ++ " " ++ J_SrcOnly ++ " " ++ J_DstOnly ++ " " ++ S_Identical), + case file:read_file_info(S_Identical) of + {ok, _} -> + verb("identical (secondary sort)\n", []); + {error, enoent} -> + io:format("differences found:"), + repair_both(File, Size, V, Mode, + J_Both, J_SrcOnly, J_DstOnly, + SrcS, DstS) + end + end + after + catch file:close(FH1), + catch file:close(FH2), + [(catch file:delete(FF)) || FF <- [Tmp1,Tmp2,J_Both,J_SrcOnly,J_DstOnly, + S_Identical]] + end. + +repair_both(File, _Size, V, Mode, J_Both, J_SrcOnly, J_DstOnly, SrcS, DstS) -> + AccFun = if Mode == check -> + fun(_X, List) -> List end; + Mode == repair -> + fun( X, List) -> [X|List] end + end, + BothFun = fun(<<_OffsetSrcHex:16/binary, " ", + LenSrcHex:8/binary, " ", CSumSrcHex:32/binary, " ", + LenDstHex:8/binary, " ", CSumDstHex:32/binary, "\n">> =Line, + {SameB, SameC, DiffB, DiffC, Ds}) -> + <> = hexstr_to_bin(LenSrcHex), + if LenSrcHex == LenDstHex, + CSumSrcHex == CSumDstHex -> + {SameB + Len, SameC + 1, DiffB, DiffC, Ds}; + true -> + %% D = {OffsetSrcHex, LenSrcHex, ........ + {SameB, SameC, DiffB + Len, DiffC + 1, + AccFun(Line, Ds)} + end; + (_Else, Acc) -> + Acc + end, + OnlyFun = fun(<<_OffsetSrcHex:16/binary, " ", LenSrcHex:8/binary, " ", + _CSumHex:32/binary, "\n">> = Line, + {DiffB, DiffC, Ds}) -> + <> = hexstr_to_bin(LenSrcHex), + {DiffB + Len, DiffC + 1, AccFun(Line, Ds)}; + (_Else, Acc) -> + Acc + end, + {SameBx, SameCx, DiffBy, DiffCy, BothDiffs} = + file_folder(BothFun, {0,0,0,0,[]}, J_Both), + {DiffB_src, DiffC_src, Ds_src} = file_folder(OnlyFun, {0,0,[]}, J_SrcOnly), + {DiffB_dst, DiffC_dst, Ds_dst} = file_folder(OnlyFun, {0,0,[]}, J_DstOnly), + if Mode == check orelse V == true -> + io:format("\n\t"), + io:format("BothR ~p, ", [{SameBx, SameCx, DiffBy, DiffCy}]), + io:format("SrcR ~p, ", [{DiffB_src, DiffC_src}]), + io:format("DstR ~p", [{DiffB_dst, DiffC_dst}]), + io:format("\n"); + true -> ok + end, + if Mode == repair -> + ok = repair_both_both(File, V, BothDiffs, SrcS, DstS), + ok = repair_copy_chunks(File, V, Ds_src, DiffB_src, DiffC_src, + SrcS, DstS), + ok = repair_copy_chunks(File, V, Ds_dst, DiffB_dst, DiffC_dst, + DstS, SrcS); + true -> + ok + end. + +repair_both_both(_File, _V, [_|_], _SrcS, _DstS) -> + %% TODO: fetch both, check checksums, hopefully only exactly one + %% is correct, then use that one to repair the other. And if the + %% sizes are different, hrm, there may be an extra corner case(s) + %% hiding there. + io:format("WHOA! We have differing checksums or sizes here, TODO not implemented, but there's trouble in the little village on the river....\n"), + timer:sleep(3*1000), + ok; +repair_both_both(_File, _V, [], _SrcS, _DstS) -> + ok. + +repair_copy_chunks(_File, _V, [], _DiffBytes, _DiffCount, _SrcS, _DstS) -> + ok; +repair_copy_chunks(File, V, ToBeCopied, DiffBytes, DiffCount, SrcS, DstS) -> + verb("\n", []), + verb("Starting copy of ~p chunks/~s MBytes to \n ~s: ", + [DiffCount, mbytes(DiffBytes), File]), + InnerCopyFun = copy_file_proc_checksum_fun(File, SrcS, DstS, V), + FoldFun = fun(Line, ok) -> + ok = InnerCopyFun(Line) % Strong sanity check + end, + ok = lists:foldl(FoldFun, ok, ToBeCopied), + verb(" done\n", []), + ok. + +file_folder(Fun, Acc, Path) -> + {ok, FH} = file:open(Path, [read, raw, binary]), + try + file_folder2(Fun, Acc, FH) + after + file:close(FH) + end. + +file_folder2(Fun, Acc, FH) -> + file_folder2(file:read_line(FH), Fun, Acc, FH). + +file_folder2({ok, Line}, Fun, Acc, FH) -> + Acc2 = Fun(Line, Acc), + file_folder2(Fun, Acc2, FH); +file_folder2(eof, _Fun, Acc, _FH) -> + Acc. + +make_repair_props(["check"|T]) -> + [{mode, check}|make_repair_props(T)]; +make_repair_props(["repair"|T]) -> + [{mode, repair}|make_repair_props(T)]; +make_repair_props(["verbose"|T]) -> + application:set_env(kernel, verbose, true), + [{verbose, true}|make_repair_props(T)]; +make_repair_props(["noverbose"|T]) -> + [{verbose, false}|make_repair_props(T)]; +make_repair_props(["progress"|T]) -> + [{progress, true}|make_repair_props(T)]; +make_repair_props(["delete-source"|T]) -> + [{delete_source, true}|make_repair_props(T)]; +make_repair_props(["nodelete-source"|T]) -> + [{delete_source, false}|make_repair_props(T)]; +make_repair_props(["nodelete-tmp"|T]) -> + [{delete_tmp, false}|make_repair_props(T)]; +make_repair_props([X|T]) -> + io:format("Error: skipping unknown option ~p\n", [X]), + make_repair_props(T); +make_repair_props([]) -> + %% Proplist defaults + [{mode, check}, {delete_source, false}]. + +mbytes(0) -> + "0.0"; +mbytes(Size) -> + lists:flatten(io_lib:format("~.1.0f", [max(0.1, Size / (1024*1024))])). + +chomp(Line) when is_binary(Line) -> + LineLen = byte_size(Line) - 1, + <> = Line, + ChompLine. + +make_outfun(FH) -> + fun({error, _} = Error) -> + file:write(FH, io_lib:format("Error: ~p\n", [Error])); + (eof) -> + ok; + ({erasure_encoded, Info} = _Erasure) -> + file:write(FH, "TODO/WIP: erasure_coded:\n"), + file:write(FH, io_lib:format("\t~p\n", [Info])); + (Bytes) when is_binary(Bytes) orelse is_list(Bytes) -> + file:write(FH, Bytes) + end. + +open_output_file("console") -> + user; +open_output_file(Path) -> + {ok, FH} = file:open(Path, [write]), + FH. + +print_upload_details(_, {error, _} = Res) -> + io:format("Error: ~p\n", [Res]), + erlang:halt(1); +print_upload_details(FH, Res) -> + [io:format(FH, "~s ~s ~s\n", [bin_to_hexstr(<>), + bin_to_hexstr(<>), + File]) || + {Offset, Len, File} <- Res]. + +%%%%%%%%%%%%%%%%% + +read_projection_file("new") -> + #projection{epoch=0, last_epoch=0, + float_map=undefined, last_float_map=undefined}; +read_projection_file(Path) -> + case filelib:is_dir(Path) of + true -> + read_projection_file_loop(Path ++ "/current.proj"); + false -> + case filelib:is_file(Path) of + true -> + read_projection_file2(Path); + false -> + error({bummer, Path}) + end + end. + +read_projection_file2(Path) -> + {ok, [P]} = file:consult(Path), + true = is_record(P, projection), + FloatMap = P#projection.float_map, + LastFloatMap = if P#projection.last_float_map == undefined -> + FloatMap; + true -> + P#projection.last_float_map + end, + P#projection{migrating=(FloatMap /= LastFloatMap), + tree=szone_chash:make_tree(FloatMap), + last_tree=szone_chash:make_tree(LastFloatMap)}. + +read_projection_file_loop(Path) -> + read_projection_file_loop(Path, 100). + +read_projection_file_loop(Path, 0) -> + error({bummer, Path}); +read_projection_file_loop(Path, N) -> + try + read_projection_file2(Path) + catch + error:{badmatch,{error,enoent}} -> + timer:sleep(100), + read_projection_file_loop(Path, N-1) + end. + +write_projection(P, Path) when is_record(P, projection) -> + {error, enoent} = file:read_file_info(Path), + {ok, FH} = file:open(Path, [write]), + WritingP = P#projection{tree=undefined, last_tree=undefined}, + io:format(FH, "~p.\n", [WritingP]), + ok = file:close(FH). + +read_weight_map_file(Path) -> + {ok, [Map]} = file:consult(Path), + true = is_list(Map), + true = lists:all(fun({Chain, Weight}) + when is_binary(Chain), + is_integer(Weight), Weight >= 0 -> + true; + (_) -> + false + end, Map), + Map. + +%% Assume the file "chains.map" in whatever dir that stores projections. +read_chain_map_file(DirPath) -> + L = case filelib:is_dir(DirPath) of + true -> + {ok, Map} = file:consult(DirPath ++ "/chains.map"), + Map; + false -> + Dir = filename:dirname(DirPath), + {ok, Map} = file:consult(Dir ++ "/chains.map"), + Map + end, + orddict:from_list(L). + +get_float_map(P) when is_record(P, projection) -> + P#projection.float_map. + +get_last_float_map(P) when is_record(P, projection) -> + P#projection.last_float_map. + +hash_and_query(Key, P) when is_record(P, projection) -> + <> = crypto:hash(sha, Key), + Float = Int / ?SHA_MAX, + {_, Current} = szone_chash:query_tree(Float, P#projection.tree), + if P#projection.migrating -> + {_, Last} = szone_chash:query_tree(Float, P#projection.last_tree), + if Last == Current -> + [Current]; + true -> + [Current, Last, Current] + end; + true -> + [Current] + end. + +calc_chain(write=Op, ProjectionPathOrDir, PrefixStr) -> + P = read_projection_file(ProjectionPathOrDir), + ChainMap = read_chain_map_file(ProjectionPathOrDir), + calc_chain(Op, P, ChainMap, PrefixStr); +calc_chain(read=Op, ProjectionPathOrDir, PrefixStr) -> + P = read_projection_file(ProjectionPathOrDir), + ChainMap = read_chain_map_file(ProjectionPathOrDir), + calc_chain(Op, P, ChainMap, PrefixStr). + +calc_chain(write=_Op, P, ChainMap, PrefixStr) -> + %% Writes are easy: always use the new location. + [Chain|_] = hash_and_query(PrefixStr, P), + {Chain, orddict:fetch(Chain, ChainMap)}; +calc_chain(read=_Op, P, ChainMap, PrefixStr) -> + %% Reads are slightly trickier: reverse each chain so tail is tried first. + Chains = hash_and_query(PrefixStr, P), + {Chains, lists:flatten([lists:reverse(orddict:fetch(Chain, ChainMap)) || + Chain <- Chains])}. + +convert_raw_hps([{HostBin, Port}|T]) -> + [binary_to_list(HostBin), integer_to_list(Port)|convert_raw_hps(T)]; +convert_raw_hps([]) -> + []. + +get_cached_sock(Host, PortStr) -> + K = {socket_cache, Host, PortStr}, + case erlang:get(K) of + undefined -> + Sock = escript_connect(Host, PortStr), + Krev = {socket_cache_rev, Sock}, + erlang:put(K, Sock), + erlang:put(Krev, {Host, PortStr}), + Sock; + Sock -> + Sock + end. + +invalidate_cached_sock(Sock) -> + (catch gen_tcp:close(Sock)), + Krev = {socket_cache_rev, Sock}, + case erlang:get(Krev) of + undefined -> + ok; + {Host, PortStr} -> + K = {socket_cache, Host, PortStr}, + erlang:erase(Krev), + erlang:erase(K), + ok + end. + %%%%%%%%%%%%%%%%% %%% basho_bench callbacks @@ -499,7 +1392,13 @@ info_msg(Fmt, Args) -> -define(DEFAULT_HOSTIP_LIST, [{{127,0,0,1}, 7071}]). -record(bb, { - sock + host, + port_str, + %% sock, + proj_check_ticker_started=false, + proj_path, + proj, + chain_map }). new(1 = Id) -> @@ -507,8 +1406,8 @@ new(1 = Id) -> case basho_bench_config:get(file0_start_listener, no) of no -> ok; - Port -> - start_listen_server(Port) + {Port, DataDir} -> + start_listen_server(Port, DataDir) end, timer:sleep(100), new_common(Id); @@ -516,12 +1415,16 @@ new(Id) -> new_common(Id). new_common(Id) -> + random:seed(now()), + ProjectionPathOrDir = + basho_bench_config:get(file0_projection_path, undefined), + Servers = basho_bench_config:get(file0_ip_list, ?DEFAULT_HOSTIP_LIST), NumServers = length(Servers), {Host, Port} = lists:nth((Id rem NumServers) + 1, Servers), - SockOpts = [{mode, binary}, {packet, raw}, {active, false}], - {ok, Sock} = gen_tcp:connect(Host, Port, SockOpts), - {ok, #bb{sock=Sock}}. + State0 = #bb{host=Host, port_str=integer_to_list(Port), + proj_path=ProjectionPathOrDir}, + {ok, read_projection_info(State0)}. run(null, _KeyGen, _ValueGen, State) -> {ok, State}; @@ -537,22 +1440,107 @@ run(append_local_server, KeyGen, ValueGen, State) -> run(append_remote_server, KeyGen, ValueGen, State) -> Prefix = KeyGen(), Value = ValueGen(), - {_, _, _} = upload_chunk_append(State#bb.sock, Prefix, Value), - {ok, State}; + bb_do_write_chunk(Prefix, Value, State#bb.host, State#bb.port_str, State); +run(cc_append_remote_server, KeyGen, ValueGen, State0) -> + State = check_projection_check(State0), + Prefix = KeyGen(), + Value = ValueGen(), + {_Chain, ModHPs} = calc_chain(write, State#bb.proj, State#bb.chain_map, + Prefix), + FoldFun = fun({Host, PortStr}, Acc) -> + case bb_do_write_chunk(Prefix, Value, Host, PortStr, + State) of + {ok, _} -> + Acc + 1; + _ -> + Acc + end + end, + case lists:foldl(FoldFun, 0, ModHPs) of + N when is_integer(N), N > 0 -> + {ok, State}; + 0 -> + {error, oh_some_problem_yo, State} + end; run(read_raw_line_local, KeyGen, _ValueGen, State) -> - {RawLine, Size} = setup_read_raw_line(KeyGen), - ok = gen_tcp:send(State#bb.sock, [RawLine, <<"\n">>]), - {ok, <<"OK\n">>} = gen_tcp:recv(State#bb.sock, 3), - {ok, _Chunk} = gen_tcp:recv(State#bb.sock, Size), + {RawLine, Size, _File} = setup_read_raw_line(KeyGen), + bb_do_read_chunk(RawLine, Size, State#bb.host, State#bb.port_str, State); +run(cc_read_raw_line_local, KeyGen, _ValueGen, State0) -> + State = check_projection_check(State0), + {RawLine, Size, File} = setup_read_raw_line(KeyGen), + Prefix = re:replace(File, "\\..*", "", [{return, binary}]), + {_Chain, ModHPs} = calc_chain(read, State#bb.proj, State#bb.chain_map, + Prefix), + FoldFun = fun(_, {ok, _}=Acc) -> + Acc; + ({Host, PortStr}, _Acc) -> + bb_do_read_chunk(RawLine, Size, Host, PortStr, State) + end, + lists:foldl(FoldFun, undefined, ModHPs). + +bb_do_read_chunk(RawLine, Size, Host, PortStr, State) -> + try + Sock = get_cached_sock(Host, PortStr), + try + ok = gen_tcp:send(Sock, [RawLine, <<"\n">>]), + read_chunk(Sock, Size, State) + catch X2:Y2 -> + invalidate_cached_sock(Sock), + {error, {X2,Y2}, State} + end + catch X:Y -> + {error, {X,Y}, State} + end. + +bb_do_write_chunk(Prefix, Value, Host, PortStr, State) -> + try + Sock = get_cached_sock(Host, PortStr), + try + {_, _, _} = upload_chunk_append(Sock, Prefix, Value), + {ok, State} + catch X2:Y2 -> + invalidate_cached_sock(Sock), + {error, {X2,Y2}, State} + end + catch X:Y -> + {error, {X,Y}, State} + end. + +read_chunk(Sock, Size, State) -> + {ok, <<"OK\n">>} = gen_tcp:recv(Sock, 3), + {ok, _Chunk} = gen_tcp:recv(Sock, Size), {ok, State}. setup_read_raw_line(KeyGen) -> RawLine = KeyGen(), <<"R ", Rest/binary>> = RawLine, - Size = read_hex_size(Rest), - {RawLine, Size}. + {_Offset, Size, File} = read_hex_size(Rest), + {RawLine, Size, File}. read_hex_size(Line) -> - <<_Offset:16/binary, " ", SizeBin:8/binary, _/binary>> = Line, - <> = hexstr_to_bin(SizeBin), - Size. + <> = Line, + <> = hexstr_to_bin(OffsetHex), + <> = hexstr_to_bin(SizeHex), + {Offset, Size, File}. + +read_projection_info(#bb{proj_path=undefined}=State) -> + State; +read_projection_info(#bb{proj_path=ProjectionPathOrDir}=State) -> + Proj = read_projection_file(ProjectionPathOrDir), + ChainMap = read_chain_map_file(ProjectionPathOrDir), + ModChainMap = + [{Chain, [{binary_to_list(Host), integer_to_list(Port)} || + {Host, Port} <- Members]} || + {Chain, Members} <- ChainMap], + State#bb{proj=Proj, chain_map=ModChainMap}. + +check_projection_check(#bb{proj_check_ticker_started=false} = State) -> + timer:send_interval(5*1000 - random:uniform(500), projection_check), + check_projection_check(State#bb{proj_check_ticker_started=true}); +check_projection_check(#bb{proj_check_ticker_started=true} = State) -> + receive + projection_check -> + read_projection_info(State) + after 0 -> + State + end. diff --git a/prototype/demo-day-hack/file0_1file_write_redundant.escript b/prototype/demo-day-hack/file0_1file_write_redundant.escript index b4082a5..ab47fb3 100644 --- a/prototype/demo-day-hack/file0_1file_write_redundant.escript +++ b/prototype/demo-day-hack/file0_1file_write_redundant.escript @@ -24,6 +24,7 @@ -module(file0_1file_write_redundant_client). -compile(export_all). +-mode(compile). -define(NO_MODULE, true). -include("./file0.erl"). diff --git a/prototype/demo-day-hack/file0_cc_1file_write_redundant.escript b/prototype/demo-day-hack/file0_cc_1file_write_redundant.escript new file mode 100644 index 0000000..c7389b6 --- /dev/null +++ b/prototype/demo-day-hack/file0_cc_1file_write_redundant.escript @@ -0,0 +1,32 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! +A 0 -smp disable -noinput -noshell + +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(file0_cc_1file_write_redundant_client). +-compile(export_all). + +-define(NO_MODULE, true). +-include("./file0.erl"). + +main(Args) -> + main2(["cc-1file-write-redundant-client" | Args]). diff --git a/prototype/demo-day-hack/file0_cc_ec_encode.escript b/prototype/demo-day-hack/file0_cc_ec_encode.escript new file mode 100644 index 0000000..e15b29e --- /dev/null +++ b/prototype/demo-day-hack/file0_cc_ec_encode.escript @@ -0,0 +1,176 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! +A 0 -smp disable -noinput -noshell -pz . + +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(file0_cc_ec_encode). +-compile(export_all). +-mode(compile). % for escript use + +-define(NO_MODULE, true). +-include("./file0.erl"). + +-include_lib("kernel/include/file.hrl"). + +-define(ENCODER, "./jerasure.`uname -s`/bin/enc-dec-wrapper.sh encoder"). +-define(ENCODER_RS_10_4_ARGS, "10 4 cauchy_good 8 10240 102400"). + +main([]) -> + io:format("Use: Erasure code a file and store all sub-chunks to a different projection (specifically for EC use)\n"), + io:format("Args: RegularProjection LocalDataPath EncodingTmpDir EC_Projection [verbose | check | repair | progress | delete-source | nodelete-tmp ]\n"), + erlang:halt(1); +main([RegularProjection, LocalDataPath, TmpDir0, EcProjection|Args]) -> + Ps = make_repair_props(Args), + TmpDir = TmpDir0 ++ "/mytmp." ++ os:getpid(), + %% Hash on entire file name, *not* prefix + FileStr = filename:basename(LocalDataPath), + {_Chain, StripeMembers0} = calc_chain(write, EcProjection, FileStr), + StripeMembers = [{binary_to_list(Host), integer_to_list(Port)} || + {Host, Port} <- StripeMembers0], + verb("Stripe members: ~p\n", [StripeMembers]), + os:cmd("rm -rf " ++ TmpDir), + filelib:ensure_dir(TmpDir ++ "/unused"), + + try + {Ks, Ms, Summary} = encode_rs_10_4(TmpDir, LocalDataPath), + verb("Work directory: ~s\n", [TmpDir]), + verb("Summary header:\n~s\n", [iolist_to_binary(Summary)]), + verb("Data files:\n"), + [verb(" ~s\n", [F]) || F <- Ks], + verb("Parity files:\n"), + [verb(" ~s\n", [F]) || F <- Ms], + case proplists:get_value(mode, Ps) of + repair -> + verb("Writing stripes to remote servers: "), + ok = write_rs_10_4_stripes(lists:zip(Ks ++ Ms, StripeMembers), + Ps), + verb("done\n"), + File = list_to_binary(filename:basename(LocalDataPath)), + ok = write_ec_summary(RegularProjection, File, Summary, Ps); + _ -> + ok + end + after + case proplists:get_value(delete_tmp, Ps) of + false -> + io:format("Not deleting data in dir ~s\n", + [TmpDir]); + _ -> + os:cmd("rm -rf " ++ TmpDir) + end + end, + erlang:halt(0). + +write_rs_10_4_stripes([], _Ps) -> + ok; +write_rs_10_4_stripes([{Path, {Host, PortStr}}|T], Ps) -> + verb("."), + Sock = escript_connect(Host, PortStr), + try + File = list_to_binary(filename:basename(Path)), + ok = write_stripe_file(Path, File, Sock), + write_rs_10_4_stripes(T, Ps) + after + ok = gen_tcp:close(Sock) + end. + +write_stripe_file(Path, File, Sock) -> + {ok, FH} = file:open(Path, [raw, binary, read]), + try + ok = write_stripe_file(0, file:read(FH, 1024*1024), + FH, Path, File, Sock) + after + file:close(FH) + end. + + +write_stripe_file(Offset, {ok, Chunk}, FH, Path, File, Sock) -> + %% OffsetHex = bin_to_hexstr(<>), + %% Len = byte_size(Chunk), + %% LenHex = bin_to_hexstr(<>), + {_,_,_} = upload_chunk_write(Sock, Offset, File, Chunk), + write_stripe_file(Offset + byte_size(Chunk), file:read(FH, 1024*1024), + FH, Path, File, Sock); +write_stripe_file(_Offset, eof, _FH, _Path, _File, _Sock) -> + ok. + +encode_rs_10_4(TmpDir, LocalDataPath) -> + Cmd = ?ENCODER ++ " " ++ TmpDir ++ " " ++ LocalDataPath ++ " " ++ + ?ENCODER_RS_10_4_ARGS, + case os:cmd(Cmd) of + "0\n" -> + Ks = [TmpDir ++ "/" ++ X || X <- filelib:wildcard("*_k??", TmpDir)], + Ms = [TmpDir ++ "/" ++ X || X <- filelib:wildcard("*_m??", TmpDir)], + [Meta] = filelib:wildcard("*_meta.txt", TmpDir), + {ok, MetaBin} = file:read_file(TmpDir ++ "/" ++ Meta), + Summary = make_ec_summary(rs_10_4_v1, MetaBin, Ks, Ms), + {Ks, Ms, Summary}; + Else -> + io:format("XX ~s\n", [Else]), + {error, Else} + end. + +make_ec_summary(rs_10_4_v1, MetaBin, Ks, _Ms) -> + [Meta1Path, OrigFileLenStr|Metas] = + string:tokens(binary_to_list(MetaBin), [10]), + Meta1 = filename:basename(Meta1Path), + OrigFileLen = list_to_integer(OrigFileLenStr), + {ok, FI} = file:read_file_info(hd(Ks)), + StripeWidth = FI#file_info.size, + Body = iolist_to_binary([string:join([Meta1,OrigFileLenStr|Metas], "\n"), + "\n"]), + BodyLen = byte_size(Body), + Hdr1 = iolist_to_binary(["a ", bin_to_hexstr(<>), + " ", + bin_to_hexstr(<>), + " ", + bin_to_hexstr(<>), + " ", + "rs_10_4_v1" + ]), + SpacePadding = 80 - 1 - byte_size(Hdr1), + Hdr2 = lists:duplicate(SpacePadding, 32), + [Hdr1, Hdr2, 10, Body]. + +write_ec_summary(ProjectionPath, File, Summary, _Ps) -> + Prefix = re:replace(File, "\\..*", "", [{return, binary}]), + {_Chain, RawHPs} = calc_chain(write, ProjectionPath, Prefix), + HP_tuples = [{binary_to_list(Host), integer_to_list(Port)} || + {Host, Port} <- RawHPs], + verb("Writing coding summary records: "), + [begin + verb("<"), + Sock = escript_connect(Host, Port), + try + {_,_,_} = upload_chunk_write(Sock, 0, File, + iolist_to_binary(Summary)), + ok = gen_tcp:send(Sock, [<<"TRUNC-hack--- ">>, File, <<"\n">>]), + inet:setopts(Sock, [packet, line]), + {ok, <<"OK\n">>} = gen_tcp:recv(Sock, 0) + after + verb(">"), + gen_tcp:close(Sock) + end + end || {Host, Port} <- HP_tuples], + verb(" done\n"), + ok. + diff --git a/prototype/demo-day-hack/file0_cc_make_projection.escript b/prototype/demo-day-hack/file0_cc_make_projection.escript new file mode 100644 index 0000000..8c6920f --- /dev/null +++ b/prototype/demo-day-hack/file0_cc_make_projection.escript @@ -0,0 +1,56 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! +A 0 -smp disable -noinput -noshell -pz . + +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(file0_cc_make_projection). +-compile(export_all). +-mode(compile). % for escript use + +-define(NO_MODULE, true). +-include("./file0.erl"). + +%% LastProjFile :: "new" : "/path/to/projection/file.proj" +%% NewWeightMapFile :: "/path/to/weight/map/file.weight" +%% NewProjectionPath :: Output file path, ".proj" suffix is recommended + +main([]) -> + io:format("Use: Make a projection description file.\n"), + io:format("Args: 'new'|ProjectionPath File.weight NewProjectionPath\n"), + erlang:halt(1); +main([LastProjPath, NewWeightMapPath, NewProjectionPath]) -> + LastP = read_projection_file(LastProjPath), + LastFloatMap = get_float_map(LastP), + NewWeightMap = read_weight_map_file(NewWeightMapPath), + io:format("LastFloatMap: ~p\n", [LastFloatMap]), + NewFloatMap = if LastFloatMap == undefined -> + szone_chash:make_float_map(NewWeightMap); + true -> + szone_chash:make_float_map(LastFloatMap, NewWeightMap) + end, + io:format("NewFloatMap: ~p\n", [NewFloatMap]), + + NewP = #projection{epoch=LastP#projection.epoch + 1, + last_epoch=LastP#projection.epoch, + float_map=NewFloatMap, + last_float_map=LastFloatMap}, + ok = write_projection(NewP, NewProjectionPath). diff --git a/prototype/demo-day-hack/file0_cc_map_prefix.escript b/prototype/demo-day-hack/file0_cc_map_prefix.escript new file mode 100644 index 0000000..45ee8b4 --- /dev/null +++ b/prototype/demo-day-hack/file0_cc_map_prefix.escript @@ -0,0 +1,54 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! +A 0 -smp disable -noinput -noshell -pz . + +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(file0_cc_map_prefix). +-compile(export_all). +-mode(compile). % for escript use + +-define(NO_MODULE, true). +-include("./file0.erl"). + +%% Map a prefix into a projection. +%% Return a list of host/ip pairs, one per line. If migrating, +%% and if the current & last projections do not match, then the +%% servers for new, old, & new will be given. +%% The # of output lines can be limited with an optional 3rd arg. + +main([]) -> + io:format("Use: Map a file prefix to a chain via projection.\n"), + io:format("Args: ProjectionPath Prefix [MaxReturnResults]\n"), + erlang:halt(1); +main([ProjectionPathOrDir, Prefix]) -> + main([ProjectionPathOrDir, Prefix, "3"]); +main([ProjectionPathOrDir, Prefix, MaxNumStr]) -> + P = read_projection_file(ProjectionPathOrDir), + Chains = lists:sublist(hash_and_query(Prefix, P), + list_to_integer(MaxNumStr)), + ChainMap = read_chain_map_file(ProjectionPathOrDir), + [io:format("~s ~s ~w\n", [Chain, Host, Port]) || + Chain <- Chains, + {Host, Port} <- orddict:fetch(Chain, ChainMap) + ]. + + diff --git a/prototype/demo-day-hack/file0_cc_migrate_files.escript b/prototype/demo-day-hack/file0_cc_migrate_files.escript new file mode 100644 index 0000000..9c0ad34 --- /dev/null +++ b/prototype/demo-day-hack/file0_cc_migrate_files.escript @@ -0,0 +1,100 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! +A 0 -smp disable -noinput -noshell + +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(file0_cc_migrate_files). +-compile(export_all). +-mode(compile). + +-define(NO_MODULE, true). +-include("./file0.erl"). + +main([]) -> + io:format("Use: Migrate files from old chain to new chain via projection.\n"), + io:format("Args: ProjectionPath Host Port [ verbose check repair delete-source noverbose nodelete-source ]\n"), + erlang:halt(1); +main([ProjectionPathOrDir, Host, PortStr|Args]) -> + Ps = make_repair_props(Args), + P = read_projection_file(ProjectionPathOrDir), + ChainMap = read_chain_map_file(ProjectionPathOrDir), + + SrcS = escript_connect(Host, PortStr), + SrcS2 = escript_connect(Host, PortStr), + Items = escript_list2(SrcS, fun(_) -> ok end), + process_files(Items, SrcS, SrcS2, P, ChainMap, Ps). + +process_files([], _SrcS, _SrcS2, _P, _ChainMap, _Ps) -> + verb("Done\n"), + ok; +process_files([Line|Items], SrcS, SrcS2, P, ChainMap, Ps) -> + FileLen = byte_size(Line) - 16 - 1 - 1, + <> = Line, + Size = binary_to_integer(SizeHex, 16), + Prefix = re:replace(File, "\\..*", "", [{return, binary}]), + %% verb(Ps, "File ~s, prefix ~s\n", [File, Prefix]), + verb("File ~s\n", [File]), + verb(" ~s MBytes, ", [mbytes(Size)]), + case calc_chain(read, P, ChainMap, Prefix) of + {[OldChain], _} -> + verb("remain in ~s\n", [OldChain]); + {[NewChain,OldChain|_], _} -> + verb("move ~s -> ~s\n", [OldChain, NewChain]), + case proplists:get_value(mode, Ps) of + repair -> + DestRawHPs = orddict:fetch(NewChain, ChainMap), + ok = migrate_a_file(SrcS, SrcS2, File, DestRawHPs, Ps), + case proplists:get_value(delete_source, Ps) of + true -> + verb(" delete source ~s\n", [File]), + ok = escript_delete(SrcS, File), + ok; + _ -> + verb(" skipping delete of source ~s\n", [File]) + end; + _ -> + ok + end + end, + process_files(Items, SrcS, SrcS2, P, ChainMap, Ps). + +migrate_a_file(SrcS, SrcS2, File, DestRawHPs, Ps) -> + SrcName = "hack-src", + DstName = "hack-dst", + [begin + [HostStr, PortStr] = convert_raw_hps([RawHP]), + DstS = get_cached_sock(HostStr, PortStr), + + X = escript_compare_servers(SrcS, DstS, + SrcName, DstName, + fun(FName) when FName == File -> true; + (_) -> false end, + [null]), + + CheckV = proplists:get_value(mode, Ps, check), + VerboseV = proplists:get_value(verbose, Ps, t), + [ok = repair(File, Size, MissingList, + CheckV, VerboseV, + SrcS, SrcS2, DstS, dst2_unused, SrcName) || + {_File, {Size, MissingList}} <- X] + end || RawHP <- DestRawHPs], + ok. diff --git a/prototype/demo-day-hack/file0_cc_read_client.escript b/prototype/demo-day-hack/file0_cc_read_client.escript new file mode 100644 index 0000000..4521664 --- /dev/null +++ b/prototype/demo-day-hack/file0_cc_read_client.escript @@ -0,0 +1,33 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! +A 0 -smp disable -noinput -noshell + +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(file0_read_client). +-compile(export_all). +-mode(compile). + +-define(NO_MODULE, true). +-include("./file0.erl"). + +main(Args) -> + main2(["cc-chunk-read-client" | Args]). diff --git a/prototype/demo-day-hack/file0_checksum_list.escript b/prototype/demo-day-hack/file0_checksum_list.escript new file mode 100644 index 0000000..c21f15f --- /dev/null +++ b/prototype/demo-day-hack/file0_checksum_list.escript @@ -0,0 +1,37 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! +A 0 -smp disable -noinput -noshell + +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(file0_checksum_list). +-compile(export_all). + +-define(NO_MODULE, true). +-include("./file0.erl"). + +main(["line-by-line"|Args]) -> + %% This is merely a demo to show the cost of line-by-line I/O. + %% For a checksum list of 90K lines, line-by-line takes about 3 seconds. + %% Bulk I/O (used by the following clause) takes about 0.2 seconds. + main2(["checksum-list-client-line-by-line" | Args]); +main(Args) -> + main2(["checksum-list-client" | Args]). diff --git a/prototype/demo-day-hack/file0_compare_filelists.escript b/prototype/demo-day-hack/file0_compare_filelists.escript new file mode 100644 index 0000000..d5814ad --- /dev/null +++ b/prototype/demo-day-hack/file0_compare_filelists.escript @@ -0,0 +1,41 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! +A 0 -smp disable -noinput -noshell + +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(file0_compare_servers). +-compile(export_all). +-mode(compile). % for escript use + +-define(NO_MODULE, true). +-include("./file0.erl"). + +main([]) -> + io:format("Use: Compare file lists on two servers and calculate missing files.\n"), + io:format("Args: Host1, Port1, Host2, Port2 [ | 'null' | OutputPath]\n"), + erlang:halt(1); +main([Host1, PortStr1, Host2, PortStr2|Args]) -> + Sock1 = escript_connect(Host1, PortStr1), + Sock2 = escript_connect(Host2, PortStr2), + escript_compare_servers(Sock1, Sock2, {Host1, PortStr1}, {Host2, PortStr2}, + Args). + diff --git a/prototype/demo-day-hack/file0_repair_server.escript b/prototype/demo-day-hack/file0_repair_server.escript new file mode 100644 index 0000000..68ccc39 --- /dev/null +++ b/prototype/demo-day-hack/file0_repair_server.escript @@ -0,0 +1,69 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! +A 0 -smp disable -noinput -noshell + +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(file0_repair_server). +-compile(export_all). +-mode(compile). % for escript use + +-define(NO_MODULE, true). +-include("./file0.erl"). + +main([]) -> + io:format("Use: Repair a server, *uni-directionally* from source -> destination.\n"), + io:format("Args: SrcHost SrcPort DstHost DstPort [ verbose check repair delete-source noverbose nodelete-source ]\n"), + erlang:halt(1); +main([SrcHost, SrcPortStr, DstHost, DstPortStr|Args]) -> + Src = {SrcHost, SrcPortStr}, + SockSrc = escript_connect(SrcHost, SrcPortStr), + %% TODO is SockSrc2 necessary? Is SockDst2 necessary? If not, delete! + SockSrc2 = escript_connect(SrcHost, SrcPortStr), + ok = inet:setopts(SockSrc2, [{packet, raw}]), + SockDst = escript_connect(DstHost, DstPortStr), + SockDst2 = escript_connect(DstHost, DstPortStr), + Ps = make_repair_props(Args), + case proplists:get_value(mode, Ps) of + undefined -> io:format("NOTICE: default mode = check\n"), + timer:sleep(2*1000); + _ -> ok + end, + case proplists:get_value(verbose, Ps) of + true -> + io:format("Date & Time: ~p ~p\n", [date(), time()]), + io:format("Src: ~s ~s\n", [SrcHost, SrcPortStr]), + io:format("Dst: ~s ~s\n", [DstHost, DstPortStr]), + io:format("\n"); + _ -> + ok + end, + %% Dst = {DstHost, DstPortStr}, + + X = escript_compare_servers(SockSrc, SockDst, + {SrcHost, SrcPortStr}, {DstHost, DstPortStr}, + fun(_FileName) -> true end, + [null]), + [repair(File, Size, MissingList, + proplists:get_value(mode, Ps, check), + proplists:get_value(verbose, Ps, t), + SockSrc, SockSrc2, SockDst, SockDst2, Src) || + {File, {Size, MissingList}} <- X]. diff --git a/prototype/demo-day-hack/file0_server_daemon.escript b/prototype/demo-day-hack/file0_server_daemon.escript new file mode 100644 index 0000000..6fa0e54 --- /dev/null +++ b/prototype/demo-day-hack/file0_server_daemon.escript @@ -0,0 +1,33 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! +A 253 -smp enable -noinput -noshell -detached + +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(file0_server). +-compile(export_all). +-mode(compile). % for escript use + +-define(NO_MODULE, true). +-include("./file0.erl"). + +main(Args) -> + main2(["server" | Args]). diff --git a/prototype/demo-day-hack/file0_start_servers.escript b/prototype/demo-day-hack/file0_start_servers.escript new file mode 100644 index 0000000..69583c4 --- /dev/null +++ b/prototype/demo-day-hack/file0_start_servers.escript @@ -0,0 +1,43 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! +A 0 -smp disable -noinput -noshell + +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(file0_start_servers). +-compile(export_all). + +-define(NO_MODULE, true). +-include("./file0.erl"). + +main([]) -> + io:format("Use: Demonstrate the commands required to start all servers on a host.\n"), + io:format("Args: ServerMapPath Host\n"), + erlang:halt(1); +main([ServerMapPath, HostStr]) -> + Host = list_to_binary(HostStr), + {ok, Map} = file:consult(ServerMapPath), + io:format("Run the following commands to start all servers:\n\n"), + [begin + DataDir = proplists:get_value(data_dir, Ps), + io:format(" file0_server.escript file0_server ~w ~s\n", + [Port, DataDir]) + end || {{HostX, Port}, Ps} <- Map, HostX == Host]. diff --git a/prototype/demo-day-hack/file0_test.escript b/prototype/demo-day-hack/file0_test.escript new file mode 100644 index 0000000..04d532c --- /dev/null +++ b/prototype/demo-day-hack/file0_test.escript @@ -0,0 +1,107 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! +A 0 -smp disable -noinput -noshell + +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(file0_test). +-compile(export_all). +-mode(compile). + +-define(NO_MODULE, true). +-include("./file0.erl"). + +main([]) -> + io:format("Use: unit test script.\n"), + + Port1 = "7061", + Port2 = "7062", + Dir1 = "/tmp/file0_test1", + Dir2 = "/tmp/file0_test2", + Pfx1 = "testing-prefix1", + Pfx2 = "testing-prefix2", + application:set_env(kernel, verbose, false), + + os:cmd("rm -rf " ++ Dir1), + ok = start_server(["file0_test", Port1, Dir1]), + os:cmd("rm -rf " ++ Dir2), + ok = start_server(["server2", Port2, Dir2]), + timer:sleep(250), + + %% Pattern match assumes that /etc/hosts exists and is at least 11 bytes. + [_,_|_] = Chunks1 = + test_file_write_client(["localhost", Port1, "5", Pfx1, "/etc/hosts", "/dev/null"]), + io:format("write: pass\n"), + + [_,_|_] = test_chunk_read_client(["localhost", Port1], Chunks1), + [{error,_}|_] = (catch test_chunk_read_client(["localhost", Port2], Chunks1)), + io:format("read: pass\n"), + + [_] = test_list_client(["localhost", Port1, "/dev/null"]), + [] = test_list_client(["localhost", Port2, "/dev/null"]), + io:format("list: pass\n"), + + %% Pattern match assumes that /etc/hosts exists and is at least 11 bytes. + [_,_,_|_] = _Chunks2 = + test_1file_write_redundant_client( + ["5", Pfx2, "/etc/hosts", "silent", "localhost", Port1, "localhost", Port2]), + [_,_] = test_list_client(["localhost", Port1, "/dev/null"]), + [_] = test_list_client(["localhost", Port2, "/dev/null"]), + io:format("write-redundant: pass\n"), + + [PoorChunk] = test_list_client(["localhost", Port2, "/dev/null"]), + PoorFile = re:replace(PoorChunk, ".* ", "", [{return,binary}]), + ok = test_delete_client(["localhost", Port2, PoorFile]), + error = test_delete_client(["localhost", Port2, PoorFile]), + io:format("delete: pass\n"), + + ok. + +start_server([_RegNameStr,_PortStr,_Dir] = Args) -> + spawn(fun() -> main2(["server" | Args]) end), + ok. + +test_file_write_client([_Host,_PortStr,_ChunkSizeStr,_PrefixStr,_LocalPath,_Output] = Args) -> + main2(["file-write-client" | Args]). + +test_1file_write_redundant_client([_ChunkSizeStr,_PrefixStr,_LocalPath|_] = Args) -> + main2(["1file-write-redundant-client" | Args]). + +test_chunk_read_client([_Host,_PortStr] = Args, Chunks) -> + ChunkFile = "/tmp/chunkfile." ++ os:getpid(), + {ok, FH} = file:open(ChunkFile, [write]), + [begin + OffsetHex = bin_to_hexstr(<>), + SizeHex = list_to_binary(bin_to_hexstr(<>)), + io:format(FH, "~s ~s ~s\n", [OffsetHex, SizeHex, File]) + end || {Offset, Size, File} <- Chunks], + file:close(FH), + try + main2(["chunk-read-client" | Args] ++ [ChunkFile, "/dev/null"]) + after + file:delete(ChunkFile) + end. + +test_list_client([_Host,_PortStr,_OutputFile] = Args) -> + main2(["list-client" | Args]). + +test_delete_client([_Host,_PortStr,_File] = Args) -> + main2(["delete-client" | Args]). diff --git a/prototype/demo-day-hack/file0_verify_checksums.escript b/prototype/demo-day-hack/file0_verify_checksums.escript new file mode 100644 index 0000000..2ef964e --- /dev/null +++ b/prototype/demo-day-hack/file0_verify_checksums.escript @@ -0,0 +1,83 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! +A 0 -smp disable -noinput -noshell + +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(file0_server). +-compile(export_all). +-mode(compile). % for escript use + +-define(NO_MODULE, true). +-include("./file0.erl"). + +main([]) -> + io:format("Use: Verify all chunk checksums of all files on a server.\n"), + io:format("Args: Host Port File\n"), + erlang:halt(1); +main([Host, PortStr, File]) -> + Sock1 = escript_connect(Host, PortStr), + Sock2 = escript_connect(Host, PortStr), + TmpFile = "/tmp/verify-checksums." ++ os:getpid(), + try + check_checksums(Sock1, Sock2, File, TmpFile) + after + _ = (catch file:delete(TmpFile)) + end. + +check_checksums(Sock1, Sock2, File, _TmpFile) -> + FileBin = list_to_binary(File), + put(count, 0), + Proc = fun(<>) -> + Where = <>, + ChunkProc = + fun(Chunk2) when is_binary(Chunk2) -> + CSum = hexstr_to_bin(CSumHex), + CSum2 = checksum(Chunk2), + if CSum == CSum2 -> + put(count, get(count) + 1), + ok; %% io:format("."); + true -> + CSumNow = bin_to_hexstr(CSum2), + put(status, failed), + io:format("~s ~s ~s CHECKSUM-ERROR ~s ~s\n", + [Offset, Len, File, + CSumHex, CSumNow]) + end; + (_Else) -> + ok % io:format("Chunk ~P\n", [Else, 10]) + end, + escript_download_chunks(Sock2, {{{Where}}}, ChunkProc); + (<<"OK", _/binary>>) -> + ok; %% io:format("top:"); + (<<".\n">>) -> + ok %% io:format("bottom\n") + end, + escript_checksum_list(Sock1, FileBin, line_by_line, Proc), + case get(status) of + ok -> + io:format("OK, ~w chunks are good\n", [get(count)]), + erlang:halt(); + _ -> + erlang:halt(1) + end. diff --git a/prototype/demo-day-hack/jerasure.Darwin/bin/enc-dec-wrapper.sh b/prototype/demo-day-hack/jerasure.Darwin/bin/enc-dec-wrapper.sh new file mode 100644 index 0000000..950f292 --- /dev/null +++ b/prototype/demo-day-hack/jerasure.Darwin/bin/enc-dec-wrapper.sh @@ -0,0 +1,20 @@ +#!/bin/sh + +ENCDEC=$1 # Should be 'encoder' or 'decoder' +shift +WORK_DIR=$1 +shift + +case $0 in + */*) + MY_DIR=`dirname $0` + ;; + *) + TMP=`which $0` + MY_DIR=`dirname $TMP` + ;; +esac + +cd $MY_DIR +env CODING_DIR=$WORK_DIR ./$ENCDEC.bin "$@" > /dev/null 2>&1 +echo $? diff --git a/prototype/demo-day-hack/szone_chash.erl b/prototype/demo-day-hack/szone_chash.erl new file mode 100644 index 0000000..b6a1393 --- /dev/null +++ b/prototype/demo-day-hack/szone_chash.erl @@ -0,0 +1,376 @@ +%%%------------------------------------------------------------------- +%%% Copyright (c) 2007-2011 Gemini Mobile Technologies, Inc. All rights reserved. +%%% Copyright (c) 2013-2014 Basho Technologies, Inc. All rights reserved. +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%% +%%%------------------------------------------------------------------- + +-module(szone_chash). + +-define(SMALLEST_SIGNIFICANT_FLOAT_SIZE, 0.1e-12). + +%% -compile(export_all). +-export([make_float_map/1, make_float_map/2, + sum_map_weights/1, + make_tree/1, + make_tree/2, + query_tree/2, + hash_string/2, + pretty_with_integers/2, + pretty_with_integers/3]). +-export([make_demo_map1/0, make_demo_map2/0]). + +-type map_name() :: term(). +-type float_map() :: [{map_name(), float()}]. + +-export_type([float_map/0]). + +make_float_map(NewChainWeights) -> + make_float_map([], NewChainWeights). + +make_float_map([], NewChainWeights) -> + Sum = add_all_weights(NewChainWeights), + DiffMap = [{Ch, Wt/Sum} || {Ch, Wt} <- NewChainWeights], + make_float_map2([{unused, 1.0}], DiffMap, NewChainWeights); +make_float_map(OldFloatMap, NewChainWeights) -> + NewSum = add_all_weights(NewChainWeights), + %% Normalize to unit interval + %% NewChainWeights2 = [{Ch, Wt / NewSum} || {Ch, Wt} <- NewChainWeights], + + %% Reconstruct old chain weights (will be normalized to unit interval) + SumOldFloatsDict = + lists:foldl(fun({Ch, Wt}, OrdDict) -> + orddict:update_counter(Ch, Wt, OrdDict) + end, orddict:new(), OldFloatMap), + OldChainWeights = orddict:to_list(SumOldFloatsDict), + OldSum = add_all_weights(OldChainWeights), + + OldChs = [Ch || {Ch, _} <- OldChainWeights], + NewChs = [Ch || {Ch, _} <- NewChainWeights], + OldChsOnly = OldChs -- NewChs, + + %% Mark any space in by a deleted chain as unused. + OldFloatMap2 = lists:map( + fun({Ch, Wt} = ChWt) -> + case lists:member(Ch, OldChsOnly) of + true -> + {unused, Wt}; + false -> + ChWt + end + end, OldFloatMap), + + %% Create a diff map of changing chains and added chains + DiffMap = lists:map(fun({Ch, NewWt}) -> + case orddict:find(Ch, SumOldFloatsDict) of + {ok, OldWt} -> + {Ch, (NewWt / NewSum) - + (OldWt / OldSum)}; + error -> + {Ch, NewWt / NewSum} + end + end, NewChainWeights), + make_float_map2(OldFloatMap2, DiffMap, NewChainWeights). + +make_float_map2(OldFloatMap, DiffMap, _NewChainWeights) -> + FloatMap = apply_diffmap(DiffMap, OldFloatMap), + XX = combine_neighbors(collapse_unused_in_float_map(FloatMap)), + XX. + +apply_diffmap(DiffMap, FloatMap) -> + SubtractDiff = [{Ch, abs(Diff)} || {Ch, Diff} <- DiffMap, Diff < 0], + AddDiff = [D || {_Ch, Diff} = D <- DiffMap, Diff > 0], + TmpFloatMap = iter_diffmap_subtract(SubtractDiff, FloatMap), + iter_diffmap_add(AddDiff, TmpFloatMap). + +add_all_weights(ChainWeights) -> + lists:foldl(fun({_Ch, Weight}, Sum) -> Sum + Weight end, 0.0, ChainWeights). + +iter_diffmap_subtract([{Ch, Diff}|T], FloatMap) -> + iter_diffmap_subtract(T, apply_diffmap_subtract(Ch, Diff, FloatMap)); +iter_diffmap_subtract([], FloatMap) -> + FloatMap. + +iter_diffmap_add([{Ch, Diff}|T], FloatMap) -> + iter_diffmap_add(T, apply_diffmap_add(Ch, Diff, FloatMap)); +iter_diffmap_add([], FloatMap) -> + FloatMap. + +apply_diffmap_subtract(Ch, Diff, [{Ch, Wt}|T]) -> + if Wt == Diff -> + [{unused, Wt}|T]; + Wt > Diff -> + [{Ch, Wt - Diff}, {unused, Diff}|T]; + Wt < Diff -> + [{unused, Wt}|apply_diffmap_subtract(Ch, Diff - Wt, T)] + end; +apply_diffmap_subtract(Ch, Diff, [H|T]) -> + [H|apply_diffmap_subtract(Ch, Diff, T)]; +apply_diffmap_subtract(_Ch, _Diff, []) -> + []. + +apply_diffmap_add(Ch, Diff, [{unused, Wt}|T]) -> + if Wt == Diff -> + [{Ch, Wt}|T]; + Wt > Diff -> + [{Ch, Diff}, {unused, Wt - Diff}|T]; + Wt < Diff -> + [{Ch, Wt}|apply_diffmap_add(Ch, Diff - Wt, T)] + end; +apply_diffmap_add(Ch, Diff, [H|T]) -> + [H|apply_diffmap_add(Ch, Diff, T)]; +apply_diffmap_add(_Ch, _Diff, []) -> + []. + +combine_neighbors([{Ch, Wt1}, {Ch, Wt2}|T]) -> + combine_neighbors([{Ch, Wt1 + Wt2}|T]); +combine_neighbors([H|T]) -> + [H|combine_neighbors(T)]; +combine_neighbors([]) -> + []. + +collapse_unused_in_float_map([{Ch, Wt1}, {unused, Wt2}|T]) -> + collapse_unused_in_float_map([{Ch, Wt1 + Wt2}|T]); +collapse_unused_in_float_map([{unused, _}] = L) -> + L; % Degenerate case only +collapse_unused_in_float_map([H|T]) -> + [H|collapse_unused_in_float_map(T)]; +collapse_unused_in_float_map([]) -> + []. + +chash_float_map_to_nextfloat_list(FloatMap) when length(FloatMap) > 0 -> + %% QuickCheck found a bug ... need to weed out stuff smaller than + %% ?SMALLEST_SIGNIFICANT_FLOAT_SIZE here. + FM1 = [P || {_X, Y} = P <- FloatMap, Y > ?SMALLEST_SIGNIFICANT_FLOAT_SIZE], + {_Sum, NFs0} = lists:foldl(fun({Name, Amount}, {Sum, List}) -> + {Sum+Amount, [{Sum+Amount, Name}|List]} + end, {0, []}, FM1), + lists:reverse(NFs0). + +chash_nextfloat_list_to_gb_tree([]) -> + gb_trees:balance(gb_trees:from_orddict([])); +chash_nextfloat_list_to_gb_tree(NextFloatList) -> + {_FloatPos, Name} = lists:last(NextFloatList), + %% QuickCheck found a bug ... it really helps to add a catch-all item + %% at the far "right" of the list ... 42.0 is much greater than 1.0. + NFs = NextFloatList ++ [{42.0, Name}], + gb_trees:balance(gb_trees:from_orddict(orddict:from_list(NFs))). + +chash_gb_next(X, {_, GbTree}) -> + chash_gb_next1(X, GbTree). + +chash_gb_next1(X, {Key, Val, Left, _Right}) when X < Key -> + case chash_gb_next1(X, Left) of + nil -> + {Key, Val}; + Res -> + Res + end; +chash_gb_next1(X, {Key, _Val, _Left, Right}) when X >= Key -> + chash_gb_next1(X, Right); +chash_gb_next1(_X, nil) -> + nil. + +%% @type float_map() = list({brick(), float()}). A float_map is a +%% definition of brick assignments over the unit interval [0.0, 1.0]. +%% The sum of all floats must be 1.0. +%% For example, [{{br1, nd1}, 0.25}, {{br2, nd1}, 0.5}, {{br3, nd1}, 0.25}]. + +%% @type nextfloat_list() = list({float(), brick()}). A nextfloat_list +%% differs from a float_map in two respects: 1) nextfloat_list contains +%% tuples with the brick name in 2nd position, 2) the float() at each +%% position I_n > I_m, for all n, m such that n > m. +%% For example, a nextfloat_list of the float_map example above, +%% [{0.25, {br1, nd1}}, {0.75, {br2, nd1}}, {1.0, {br3, nd1}]. + +%% @doc Not used directly, but can give a developer an idea of how well +%% chash_float_map_to_nextfloat_list will do for a given value of Max. +%% +%% For example: +%% +%% NewFloatMap = make_float_map([{unused, 1.0}], +%% [{a,100}, {b, 100}, {c, 10}]), +%% ChashMap = chash_scale_to_int_interval(NewFloatMap, 100), +%% io:format("QQQ: int int = ~p\n", [ChashIntInterval]), +%% -> [{a,1,47},{b,48,94},{c,94,100}] +%% +%% +%% Interpretation: out of the 100 slots: +%%
    +%%
  • 'a' uses the slots 1-47
  • +%%
  • 'b' uses the slots 48-94
  • +%%
  • 'c' uses the slots 95-100
  • +%%
+ +chash_scale_to_int_interval(NewFloatMap, Max) -> + chash_scale_to_int_interval(NewFloatMap, 0, Max). + +chash_scale_to_int_interval([{Ch, _Wt}], Cur, Max) -> + [{Ch, Cur, Max}]; +chash_scale_to_int_interval([{Ch, Wt}|T], Cur, Max) -> + Int = trunc(Wt * Max), + [{Ch, Cur + 1, Cur + Int}|chash_scale_to_int_interval(T, Cur + Int, Max)]. + +%%%%%%%%%%%%% + +pretty_with_integers(Map, Scale) -> + chash_scale_to_int_interval(Map, Scale). + +pretty_with_integers(OldWeights, NewWeights, Scale) -> + chash_scale_to_int_interval( + make_float_map(make_float_map(OldWeights), + NewWeights), + Scale). + +make_tree(Map) -> + chash_nextfloat_list_to_gb_tree( + chash_float_map_to_nextfloat_list(Map)). + +make_tree(OldWeights, NewWeights) -> + chash_nextfloat_list_to_gb_tree( + chash_float_map_to_nextfloat_list( + make_float_map(make_float_map(OldWeights), NewWeights))). + +query_tree(Val, Tree) when is_float(Val), 0.0 =< Val, Val =< 1.0 -> + chash_gb_next(Val, Tree). + +make_demo_map1() -> + {_, Res} = make_demo_map1_i(), + Res. + +make_demo_map1_i() -> + Fail1 = {b, 100}, + L1 = [{a, 100}, Fail1, {c, 100}], + L2 = L1 ++ [{d, 100}, {e, 100}], + L3 = L2 -- [Fail1], + L4 = L3 ++ [{giant, 300}], + {L4, lists:foldl(fun(New, Old) -> make_float_map(Old, New) end, + make_float_map(L1), [L2, L3, L4])}. + +make_demo_map2() -> + {L0, _} = make_demo_map1_i(), + L1 = L0 ++ [{h, 100}], + L2 = L1 ++ [{i, 100}], + L3 = L2 ++ [{j, 100}], + lists:foldl(fun(New, Old) -> make_float_map(Old, New) end, + make_demo_map1(), [L1, L2, L3]). + +sum_map_weights(Map) -> + L = sum_map_weights(lists:sort(Map), undefined, 0.0) -- [{undefined,0.0}], + WeightSum = lists:sum([Weight || {_, Weight} <- L]), + {{per_szone, L}, {weight_sum, WeightSum}}. + +sum_map_weights([{SZ, Weight}|T], SZ, SZ_total) -> + sum_map_weights(T, SZ, SZ_total + Weight); +sum_map_weights([{SZ, Weight}|T], LastSZ, LastSZ_total) -> + [{LastSZ, LastSZ_total}|sum_map_weights(T, SZ, Weight)]; +sum_map_weights([], LastSZ, LastSZ_total) -> + [{LastSZ, LastSZ_total}]. + +-define(SHA_MAX, (1 bsl (20*8))). + +%% Optimization is certainly possible here. + +hash_string(Key, Map) -> + catch crypto:start(), + Tree = make_tree(Map), + <> = crypto:hash(sha, Key), + Float = Int / ?SHA_MAX, + query_tree(Float, Tree). + +%%%%% Examples + +%% %% Make a map. See the code for make_demo_map1() for the order of +%% %% additions & deletions. Here's a brief summary of the 4 steps. +%% %% +%% %% * 'a' through 'e' are weighted @ 100. +%% %% * 'giant' is weighted @ 300. +%% %% * 'b' is removed at step #3. + +%% 40> M1 = szone_chash:make_demo_map1(). +%% [{a,0.09285714285714286}, +%% {giant,0.10714285714285715}, +%% {d,0.026190476190476153}, +%% {giant,0.10714285714285715}, +%% {a,0.04999999999999999}, +%% {giant,0.04999999999999999}, +%% {d,0.04999999999999999}, +%% {giant,0.050000000000000044}, +%% {d,0.06666666666666671}, +%% {e,0.009523809523809434}, +%% {giant,0.05714285714285716}, +%% {c,0.14285714285714285}, +%% {giant,0.05714285714285716}, +%% {e,0.13333333333333341}] + + +%% %% Map M1 onto the interval of integers 0-10,1000 +%% %% +%% %% output = list({SZ_name::term(), Start::integer(), End::integer()}) + +%% 41> szone_chash:pretty_with_integers(M1, 10*1000). +%% [{a,1,928}, +%% {giant,929,1999}, +%% {d,2000,2260}, +%% {giant,2261,3331}, +%% {a,3332,3830}, +%% {giant,3831,4329}, +%% {d,4330,4828}, +%% {giant,4829,5328}, +%% {d,5329,5994}, +%% {e,5995,6089}, +%% {giant,6090,6660}, +%% {c,6661,8088}, +%% {giant,8089,8659}, +%% {e,8659,10000}] + +%% %% Sum up all of the weights, make sure it's what we expect: + +%% 55> szone_chash:sum_map_weights(M1). +%% {{per_szone,[{a,0.14285714285714285}, +%% {c,0.14285714285714285}, +%% {d,0.14285714285714285}, +%% {e,0.14285714285714285}, +%% {giant,0.42857142857142866}]}, +%% {weight_sum,1.0}} + +%% %% Make a tree, then query it +%% %% (Hash::float(), tree()) -> {NextLargestBoundary::float(), szone()} + +%% 58> T1 = szone_chash:make_tree(M1). +%% 59> szone_chash:query_tree(0.2555, T1). +%% {0.3333333333333333,giant} +%% 60> szone_chash:query_tree(0.3555, T1). +%% {0.3833333333333333,a} +%% 61> szone_chash:query_tree(0.4555, T1). +%% {0.4833333333333333,d} + +%% %% How about hashing a bunch of strings and see what happens? + +%% 74> Key1 = "Hello, world!". +%% "Hello, world!" +%% 75> [{K, element(2, szone_chash:hash_string(K, M1))} || K <- [lists:sublist(Key1, X) || X <- lists:seq(1, length(Key1))]]. +%% [{"H",giant}, +%% {"He",giant}, +%% {"Hel",giant}, +%% {"Hell",e}, +%% {"Hello",e}, +%% {"Hello,",giant}, +%% {"Hello, ",e}, +%% {"Hello, w",e}, +%% {"Hello, wo",giant}, +%% {"Hello, wor",d}, +%% {"Hello, worl",giant}, +%% {"Hello, world",e}, +%% {"Hello, world!",d}]