diff --git a/.gitignore b/.gitignore index 09dacbc..5243bad 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ prototype/chain-manager/patch.* +.eqc-info .eunit deps *.plt @@ -6,3 +7,6 @@ erl_crash.dump .concrete/DEV_MODE .rebar edoc + +# PB artifacts for Erlang +include/machi_pb.hrl diff --git a/Makefile b/Makefile index a118de4..c273ed6 100644 --- a/Makefile +++ b/Makefile @@ -37,13 +37,18 @@ PLT = $(HOME)/.machi_dialyzer_plt build_plt: deps compile dialyzer --build_plt --output_plt $(PLT) --apps $(APPS) deps/*/ebin +DIALYZER_DEP_APPS = ebin/machi_pb.beam deps/protobuffs/ebin +DIALYZER_FLAGS = -Wno_return -Wrace_conditions -Wunderspecs + dialyzer: deps compile - dialyzer -Wno_return --plt $(PLT) ebin + dialyzer $(DIALYZER_FLAGS) --plt $(PLT) ebin $(DIALYZER_DEP_APPS) | \ + egrep -v -f ./filter-dialyzer-dep-warnings dialyzer-test: deps compile echo Force rebar to recompile .eunit dir w/o running tests > /dev/null rebar skip_deps=true eunit suite=lamport_clock - dialyzer -Wno_return --plt $(PLT) .eunit + dialyzer $(DIALYZER_FLAGS) --plt $(PLT) .eunit $(DIALYZER_DEP_APPS) | \ + egrep -v -f ./filter-dialyzer-dep-warnings clean_plt: rm $(PLT) diff --git a/filter-dialyzer-dep-warnings b/filter-dialyzer-dep-warnings new file mode 100644 index 0000000..b71c024 --- /dev/null +++ b/filter-dialyzer-dep-warnings @@ -0,0 +1,12 @@ +####################### patterns for general errors in dep modules: +^protobuffs\.erl: +^protobuffs_[a-z_]*\.erl: +^leexinc\.hrl:[0-9][0-9]*: +^machi_chain_manager1.erl:[0-9][0-9]*: Guard test RetrospectiveP::'false' =:= 'true' can never succeed +^machi_pb\.erl:[0-9][0-9]*: +^pokemon_pb\.erl:[0-9][0-9]*: +####################### patterns for unknown functions: +^ basho_bench_config:get/2 +^ erl_prettypr:format/1 +^ erl_syntax:form_list/1 +^ machi_partition_simulator:get/1 diff --git a/rebar b/rebar index 146c9aa..d53a235 100755 Binary files a/rebar and b/rebar differ diff --git a/rebar.config b/rebar.config index 88f9c3d..bb37270 100644 --- a/rebar.config +++ b/rebar.config @@ -5,6 +5,7 @@ {edoc_opts, [{dir, "./edoc"}]}. {deps, [ - {lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "2.0.1"}}} + {lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "2.0.1"}}}, + {protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", {tag, "0.8.1p4"}}} ]}. diff --git a/src/machi.proto b/src/machi.proto new file mode 100644 index 0000000..993ff42 --- /dev/null +++ b/src/machi.proto @@ -0,0 +1,185 @@ +/* ------------------------------------------------------------------- +** +** machi.proto: Protocol Buffers definition for Machi +** +** Copyright (c) 2014-2015 Basho Technologies, Inc. All Rights Reserved. +** +** This file is provided to you under the Apache License, +** Version 2.0 (the "License"); you may not use this file +** except in compliance with the License. You may obtain +** a copy of the License at +** +** http://www.apache.org/licenses/LICENSE-2.0 +** +** Unless required by applicable law or agreed to in writing, +** software distributed under the License is distributed on an +** "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +** KIND, either express or implied. See the License for the +** specific language governing permissions and limitations +** under the License. +** +** ------------------------------------------------------------------- +*/ + +/* +** Revision: 0.1 +*/ + +// Java package specifiers +option java_package = "com.basho.machi.protobuf"; +option java_outer_classname = "MachiPB"; + +////////////////////////////////////////// +// +// enums +// +////////////////////////////////////////// + +enum Mpb_GeneralStatusCode { + OK = 0; + BAD_ARG = 1; + WEDGED = 2; + BAD_CHECKSUM = 3; + PARTITION = 4; +} + +// Must match with machi.hrl's values! +enum Mpb_CSumType { + CSUM_TAG_NONE = 0; + CSUM_TAG_CLIENT_GEN = 1; + CSUM_TAG_SERVER_GEN = 2; + CSUM_TAG_SERVER_REGEN = 3; +} + +////////////////////////////////////////// +// +// basic data types +// +////////////////////////////////////////// + +// chunk_pos() type +message Mpb_ChunkPos { + required uint64 offset = 1; + required uint64 chunk_size = 2; + required string file_name = 3; +} + +// chunk_csum() type +message Mpb_ChunkCSum { + required Mpb_CSumType type = 1; + required bytes csum = 2; +} + +// epoch_id() type +message Mpb_EpochId { + required uint32 epoch_num = 1; + required Mpb_ChunkCSum epoch_csum = 2; +} + +////////////////////////////////////////// +// +// requests & responses +// +////////////////////////////////////////// + +// Error response - may be generated for any Req +message Mpb_ErrorResp { + // Free-form (depends on server, which is probably a bad idea TODO) + required int32 code = 1; + required string msg = 2; + optional bytes extra = 3; +} + +// ping() request & response + +message Mpb_EchoReq { + optional string message = 1; +} + +message Mpb_EchoResp { + optional string message = 1; +} + +// Authentication request & response + +message Mpb_AuthReq { + required bytes user = 1; + required bytes password = 2; +} + +message Mpb_AuthResp { + required uint32 code = 1; + // TODO: not implemented yet +} + +// append_chunk() request & response + +message Mpb_AppendChunkReq { + required string prefix = 1; + optional bytes placement_key = 2; + required bytes chunk = 3; + optional uint32 chunk_extra = 4; +} + +message Mpb_AppendChunkResp { + required Mpb_GeneralStatusCode status = 1; + // If OK, then chunk_pos is defined. + optional Mpb_ChunkPos chunk_pos = 2; +} + +// write_chunk() request & response + +message Mpb_WriteChunkReq { + required string file = 1; + required uint64 offset = 2; + required bytes chunk = 3; +} + +message Mpb_WriteChunkResp { + required Mpb_GeneralStatusCode status = 1; + // If OK, then chunk_pos is defined. + optional Mpb_ChunkPos chunk_pos = 2; +} + +////////////////////////////////////////// +// +// request & response wrapper +// +////////////////////////////////////////// + +message Mpb_Request { + // TODO: If we wish to support pipelined requests sometime in the + // future, this is the placeholder to do it. + required bytes req_id = 1; + + // The client should only define one request message. If the client + // includes multiple requests here, the server may pick/choose an + // arbitrary one. + // NOTE: The erlang protobuffs compiler doesn't support 'oneof'. + // But 'oneof' appears to be a very tiny memory optimization + // that not all languages might care about? (Erlang doesn't) + optional Mpb_EchoReq echo = 10; + optional Mpb_AuthReq auth = 11; + optional Mpb_AppendChunkReq append_chunk = 12; + optional Mpb_WriteChunkReq write_chunk = 13; +} + +message Mpb_Response { + // TODO: If we wish to support pipelined requests sometime in the + // future, this is the placeholder to do it. + required bytes req_id = 1; + + // The server will define only one of the optional responses below. + + // Generic error response, typically used when something quite + // bad/unexpected happened within the server. + // Clients should always check this response and, if defined, + // ignroe any request-specific response at codes 10+. + optional Mpb_ErrorResp generic = 2; + + // Specific responses. + optional Mpb_EchoResp echo = 10; + optional Mpb_AuthResp auth = 11; + optional Mpb_AppendChunkResp append_chunk = 12; + optional Mpb_WriteChunkResp write_chunk = 13; +} diff --git a/src/machi_admin_util.erl b/src/machi_admin_util.erl index 207adf9..f5702c2 100644 --- a/src/machi_admin_util.erl +++ b/src/machi_admin_util.erl @@ -22,10 +22,6 @@ -module(machi_admin_util). -%% TODO Move these types to a common header file? (also machi_flu1_client.erl?) --type inet_host() :: inet:ip_address() | inet:hostname(). --type inet_port() :: inet:port_number(). - -export([ verify_file_checksums_local/3, verify_file_checksums_local/4, verify_file_checksums_remote/3, verify_file_checksums_remote/4 @@ -37,13 +33,13 @@ -define(FLU_C, machi_flu1_client). --spec verify_file_checksums_local(port(), machi_flu1_client:epoch_id(), binary()|list()) -> +-spec verify_file_checksums_local(port(), machi_dt:epoch_id(), binary()|list()) -> {ok, [tuple()]} | {error, term()}. verify_file_checksums_local(Sock1, EpochID, Path) when is_port(Sock1) -> verify_file_checksums_local2(Sock1, EpochID, Path). --spec verify_file_checksums_local(inet_host(), inet_port(), - machi_flu1_client:epoch_id(), binary()|list()) -> +-spec verify_file_checksums_local(machi_dt:inet_host(), machi_dt:inet_port(), + machi_dt:epoch_id(), binary()|list()) -> {ok, [tuple()]} | {error, term()}. verify_file_checksums_local(Host, TcpPort, EpochID, Path) -> Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}), @@ -53,13 +49,13 @@ verify_file_checksums_local(Host, TcpPort, EpochID, Path) -> catch ?FLU_C:disconnect(Sock1) end. --spec verify_file_checksums_remote(port(), machi_flu1_client:epoch_id(), binary()|list()) -> +-spec verify_file_checksums_remote(port(), machi_dt:epoch_id(), binary()|list()) -> {ok, [tuple()]} | {error, term()}. verify_file_checksums_remote(Sock1, EpochID, File) when is_port(Sock1) -> verify_file_checksums_remote2(Sock1, EpochID, File). --spec verify_file_checksums_remote(inet_host(), inet_port(), - machi_flu1_client:epoch_id(), binary()|list()) -> +-spec verify_file_checksums_remote(machi_dt:inet_host(), machi_dt:inet_port(), + machi_dt:epoch_id(), binary()|list()) -> {ok, [tuple()]} | {error, term()}. verify_file_checksums_remote(Host, TcpPort, EpochID, File) -> Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}), diff --git a/src/machi_basho_bench_driver.erl b/src/machi_basho_bench_driver.erl index 090f3a6..6fd4b59 100644 --- a/src/machi_basho_bench_driver.erl +++ b/src/machi_basho_bench_driver.erl @@ -26,7 +26,7 @@ %% use basho_bench to measure its performance under a certain %% workload. Machi is a bit different than most KV stores in that the %% client has no direct control over the keys -- Machi servers always -%% assign the keys. The schemes typically used by basho_bench & YCSB +%% assign the keys. The schemes typically used by basho_bench & YCSB %% to use/mimic key naming conventions used internally ... are %% difficult to adapt to Machi. %% @@ -50,7 +50,7 @@ %% %% TODO: As an alternate idea, if we know that the chunks written are %% always the same size, and if we don't care about CRC checking, then -%% all we need to know are the file names & file sizes on the server: +%% all we need to know are the file names & file sizes on the server: %% we can then pick any valid offset within that file. That would %% certainly be more scalable than the zillion-row-ETS-table, which is %% definitely RAM-hungry. @@ -67,9 +67,9 @@ -define(ETS_TAB, machi_keys). -define(THE_TIMEOUT, 60*1000). --define(INFO(Str, Args), lager:info(Str, Args)). --define(WARN(Str, Args), lager:warning(Str, Args)). --define(ERROR(Str, Args), lager:error(Str, Args)). +-define(INFO(Str, Args), (_ = lager:info(Str, Args))). +-define(WARN(Str, Args), (_ = lager:warning(Str, Args))). +-define(ERROR(Str, Args), (_ = lager:error(Str, Args))). new(Id) -> Ps = find_server_info(Id), diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index ac7134b..09ef18d 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -119,6 +119,7 @@ %% File API append_chunk/3, append_chunk/4, append_chunk_extra/4, append_chunk_extra/5, + write_chunk/4, write_chunk/5, read_chunk/4, read_chunk/5, checksum_list/2, checksum_list/3, list_files/1, list_files/2, @@ -180,6 +181,19 @@ append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra, Timeout) -> Chunk, ChunkExtra}}, Timeout). +%% @doc Write a chunk of data (that has already been +%% allocated/sequenced by an earlier append_chunk_extra() call) to +%% `File' at `Offset'. + +write_chunk(PidSpec, File, Offset, Chunk) -> + write_chunk(PidSpec, File, Offset, Chunk, ?DEFAULT_TIMEOUT). + +%% @doc Read a chunk of data of size `Size' from `File' at `Offset'. + +write_chunk(PidSpec, File, Offset, Chunk, Timeout) -> + gen_server:call(PidSpec, {req, {write_chunk, File, Offset, Chunk}}, + Timeout). + %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. read_chunk(PidSpec, File, Offset, Size) -> @@ -252,6 +266,8 @@ code_change(_OldVsn, S, _Extra) -> handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra}, _From, S) -> do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), S); +handle_call2({write_chunk, File, Offset, Chunk}, _From, S) -> + do_write_head(File, Offset, Chunk, 0, os:timestamp(), S); handle_call2({read_chunk, File, Offset, Size}, _From, S) -> do_read_chunk(File, Offset, Size, 0, os:timestamp(), S); handle_call2({checksum_list, File}, _From, S) -> @@ -338,8 +354,13 @@ do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, %% over with a new sequencer assignment, at %% the 2nd have of the impl (we have already %% slept & refreshed the projection). - do_append_head2(Prefix, Chunk, ChunkExtra, Depth, - STime, S2); + + if Prefix == undefined -> % atom! not binary()!! + {error, partition}; + true -> + do_append_head2(Prefix, Chunk, ChunkExtra, + Depth, STime, S2) + end; RestFLUs3 -> do_append_midtail2(RestFLUs3, Prefix, File, Offset, Chunk, ChunkExtra, @@ -378,6 +399,51 @@ do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk, exit({todo_should_never_happen,?MODULE,?LINE,File,Offset}) end. +do_write_head(File, Offset, Chunk, 0=Depth, STime, S) -> + do_write_head2(File, Offset, Chunk, Depth + 1, STime, S); +do_write_head(File, Offset, Chunk, Depth, STime, #state{proj=P}=S) -> + %% io:format(user, "head sleep1,", []), + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > ?MAX_RUNTIME -> + {reply, {error, partition}, S}; + true -> + %% This is suboptimal for performance: there are some paths + %% through this point where our current projection is good + %% enough. But we're going to try to keep the code as simple + %% as we can for now. + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + case S2#state.proj of + P2 when P2 == undefined orelse + P2#projection_v1.upi == [] -> + do_write_head(File, Offset, Chunk, Depth + 1, STime, S2); + _ -> + do_write_head2(File, Offset, Chunk, Depth + 1, STime, S2) + end + end. + +do_write_head2(File, Offset, Chunk, Depth, STime, + #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> + [HeadFLU|RestFLUs] = mutation_flus(P), + Proxy = orddict:fetch(HeadFLU, PD), + case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of + ok -> + %% From this point onward, we use the same code & logic path as + %% append does. + do_append_midtail(RestFLUs, undefined, File, Offset, Chunk, + undefined, [HeadFLU], 0, STime, S); + {error, bad_checksum}=BadCS -> + {reply, BadCS, S}; + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + do_write_head(File, Offset, Chunk, Depth, STime, S); + {error, written}=Err -> + Err; + {error, not_written} -> + exit({todo_should_never_happen,?MODULE,?LINE, + iolist_size(Chunk)}) + end. + do_read_chunk(File, Offset, Size, 0=Depth, STime, #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty do_read_chunk2(File, Offset, Size, Depth + 1, STime, S); diff --git a/src/machi_dt.erl b/src/machi_dt.erl new file mode 100644 index 0000000..6d94e6d --- /dev/null +++ b/src/machi_dt.erl @@ -0,0 +1,70 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(machi_dt). + +-include("machi_projection.hrl"). + +-type chunk() :: chunk_bin() | {chunk_csum(), chunk_bin()}. +-type chunk_bin() :: binary() | iolist(). % client can use either +-type chunk_csum() :: binary(). % 1 byte tag, N-1 bytes checksum +-type chunk_summary() :: {file_offset(), chunk_size(), binary()}. +-type chunk_s() :: binary(). % server always uses binary() +-type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}. +-type chunk_size() :: non_neg_integer(). +-type error_general() :: 'bad_arg' | 'wedged' | 'bad_checksum'. +-type epoch_csum() :: binary(). +-type epoch_num() :: -1 | non_neg_integer(). +-type epoch_id() :: {epoch_num(), epoch_csum()}. +-type file_info() :: {file_size(), file_name_s()}. +-type file_name() :: float().%%%binary() | list(). +-type file_name_s() :: binary(). % server reply +-type file_offset() :: non_neg_integer(). +-type file_size() :: non_neg_integer(). +-type file_prefix() :: binary() | list(). +-type inet_host() :: inet:ip_address() | inet:hostname(). +-type inet_port() :: inet:port_number(). +-type projection() :: #projection_v1{}. +-type projection_type() :: 'public' | 'private'. + +-export_type([ + chunk/0, + chunk_bin/0, + chunk_csum/0, + chunk_summary/0, + chunk_s/0, + chunk_pos/0, + chunk_size/0, + error_general/0, + epoch_csum/0, + epoch_num/0, + epoch_id/0, + file_info/0, + file_name/0, + file_name_s/0, + file_offset/0, + file_size/0, + file_prefix/0, + inet_host/0, + inet_port/0, + projection/0, + projection_type/0 + ]). + diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 95424a0..af507c2 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -75,6 +75,7 @@ -include_lib("kernel/include/file.hrl"). -include("machi.hrl"). +-include("machi_pb.hrl"). -include("machi_projection.hrl"). -define(SERVER_CMD_READ_TIMEOUT, 600*1000). @@ -91,7 +92,7 @@ data_dir :: string(), wedged = true :: boolean(), etstab :: ets:tid(), - epoch_id :: 'undefined' | pv1_epoch(), + epoch_id :: 'undefined' | machi_dt:epoch_id(), dbg_props = [] :: list(), % proplist props = [] :: list() % proplist }). @@ -343,6 +344,10 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> http_server_hack(FluName, PutLine, Sock, S); <<"GET ", _/binary>>=PutLine -> http_server_hack(FluName, PutLine, Sock, S); + <<"PROTOCOL-BUFFERS\n">> -> + ok = gen_tcp:send(Sock, <<"OK\n">>), + ok = inet:setopts(Sock, [{packet, 4}]), + protocol_buffers_loop(Sock, S); _ -> machi_util:verb("Else Got: ~p\n", [Line]), io:format(user, "TODO: Else Got: ~p\n", [Line]), @@ -1009,6 +1014,20 @@ http_harvest_headers({ok, Hdr}, Sock, Acc) -> http_harvest_headers(gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT), Sock, [Hdr|Acc]). +protocol_buffers_loop(Sock, S) -> + case gen_tcp:recv(Sock, 0) of + {ok, _Bin} -> + R = #mpb_response{req_id= <<"not paying any attention">>, + generic=#mpb_errorresp{code=-6, + msg="not implemented"}}, + Resp = machi_pb:encode_mpb_response(R), + ok = gen_tcp:send(Sock, Resp), + protocol_buffers_loop(Sock, S); + {error, _} -> + (catch gen_tcp:close(Sock)), + exit(normal) + end. + digest_header_goop([], G) -> G; digest_header_goop([{http_header, _, 'Content-Length', _, Str}|T], G) -> diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index da4a293..c19a36a 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -85,47 +85,22 @@ trunc_hack/3, trunc_hack/4 ]). -%% TODO: Hrm, this kind of API use ... is it a bad idea? We really want to -%% encourage client-side checksums; thus it ought to be dead easy. --type chunk() :: chunk_bin() | {chunk_csum(), chunk_bin()}. --type chunk_bin() :: binary() | iolist(). % client can use either --type chunk_csum() :: binary(). % 1 byte tag, N-1 bytes checksum --type chunk_summary() :: {file_offset(), chunk_size(), binary()}. --type chunk_s() :: binary(). % server always uses binary() --type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}. --type chunk_size() :: non_neg_integer(). --type error_general() :: 'bad_arg' | 'wedged' | 'bad_checksum'. --type epoch_csum() :: binary(). --type epoch_num() :: -1 | non_neg_integer(). --type epoch_id() :: {epoch_num(), epoch_csum()}. --type file_info() :: {file_size(), file_name_s()}. --type file_name() :: binary() | list(). --type file_name_s() :: binary(). % server reply --type file_offset() :: non_neg_integer(). --type file_size() :: non_neg_integer(). --type file_prefix() :: binary() | list(). --type inet_host() :: inet:ip_address() | inet:hostname(). --type inet_port() :: inet:port_number(). -type port_wrap() :: {w,atom(),term()}. --type projection() :: #projection_v1{}. --type projection_type() :: 'public' | 'private'. - --export_type([epoch_id/0]). %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix'. --spec append_chunk(port_wrap(), epoch_id(), file_prefix(), chunk()) -> - {ok, chunk_pos()} | {error, error_general()} | {error, term()}. +-spec append_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_prefix(), machi_dt:chunk()) -> + {ok, machi_dt:chunk_pos()} | {error, machi_dt:error_general()} | {error, term()}. append_chunk(Sock, EpochID, Prefix, Chunk) -> append_chunk2(Sock, EpochID, Prefix, Chunk, 0). %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix'. --spec append_chunk(inet_host(), inet_port(), - epoch_id(), file_prefix(), chunk()) -> - {ok, chunk_pos()} | {error, error_general()} | {error, term()}. +-spec append_chunk(machi_dt:inet_host(), machi_dt:inet_port(), + machi_dt:epoch_id(), machi_dt:file_prefix(), machi_dt:chunk()) -> + {ok, machi_dt:chunk_pos()} | {error, machi_dt:error_general()} | {error, term()}. append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try @@ -142,8 +117,8 @@ append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) -> %% be reserved by the file sequencer for later write(s) by the %% `write_chunk()' API. --spec append_chunk_extra(port_wrap(), epoch_id(), file_prefix(), chunk(), chunk_size()) -> - {ok, chunk_pos()} | {error, error_general()} | {error, term()}. +-spec append_chunk_extra(port_wrap(), machi_dt:epoch_id(), machi_dt:file_prefix(), machi_dt:chunk(), machi_dt:chunk_size()) -> + {ok, machi_dt:chunk_pos()} | {error, machi_dt:error_general()} | {error, term()}. append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra) when is_integer(ChunkExtra), ChunkExtra >= 0 -> append_chunk2(Sock, EpochID, Prefix, Chunk, ChunkExtra). @@ -156,9 +131,9 @@ append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra) %% be reserved by the file sequencer for later write(s) by the %% `write_chunk()' API. --spec append_chunk_extra(inet_host(), inet_port(), - epoch_id(), file_prefix(), chunk(), chunk_size()) -> - {ok, chunk_pos()} | {error, error_general()} | {error, term()}. +-spec append_chunk_extra(machi_dt:inet_host(), machi_dt:inet_port(), + machi_dt:epoch_id(), machi_dt:file_prefix(), machi_dt:chunk(), machi_dt:chunk_size()) -> + {ok, machi_dt:chunk_pos()} | {error, machi_dt:error_general()} | {error, term()}. append_chunk_extra(Host, TcpPort, EpochID, Prefix, Chunk, ChunkExtra) when is_integer(ChunkExtra), ChunkExtra >= 0 -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), @@ -170,9 +145,9 @@ append_chunk_extra(Host, TcpPort, EpochID, Prefix, Chunk, ChunkExtra) %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. --spec read_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk_size()) -> - {ok, chunk_s()} | - {error, error_general() | 'not_written' | 'partial_read'} | +-spec read_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size()) -> + {ok, machi_dt:chunk_s()} | + {error, machi_dt:error_general() | 'not_written' | 'partial_read'} | {error, term()}. read_chunk(Sock, EpochID, File, Offset, Size) when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> @@ -180,10 +155,10 @@ read_chunk(Sock, EpochID, File, Offset, Size) %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. --spec read_chunk(inet_host(), inet_port(), epoch_id(), - file_name(), file_offset(), chunk_size()) -> - {ok, chunk_s()} | - {error, error_general() | 'not_written' | 'partial_read'} | +-spec read_chunk(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(), + machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size()) -> + {ok, machi_dt:chunk_s()} | + {error, machi_dt:error_general() | 'not_written' | 'partial_read'} | {error, term()}. read_chunk(Host, TcpPort, EpochID, File, Offset, Size) when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> @@ -196,18 +171,18 @@ read_chunk(Host, TcpPort, EpochID, File, Offset, Size) %% @doc Fetch the list of chunk checksums for `File'. --spec checksum_list(port_wrap(), epoch_id(), file_name()) -> - {ok, [chunk_summary()]} | - {error, error_general() | 'no_such_file' | 'partial_read'} | +-spec checksum_list(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name()) -> + {ok, [machi_dt:chunk_summary()]} | + {error, machi_dt:error_general() | 'no_such_file' | 'partial_read'} | {error, term()}. checksum_list(Sock, EpochID, File) -> checksum_list2(Sock, EpochID, File). %% @doc Fetch the list of chunk checksums for `File'. --spec checksum_list(inet_host(), inet_port(), epoch_id(), file_name()) -> - {ok, [chunk_summary()]} | - {error, error_general() | 'no_such_file'} | {error, term()}. +-spec checksum_list(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(), machi_dt:file_name()) -> + {ok, [machi_dt:chunk_summary()]} | + {error, machi_dt:error_general() | 'no_such_file'} | {error, term()}. checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try @@ -218,15 +193,15 @@ checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> %% @doc Fetch the list of all files on the remote FLU. --spec list_files(port_wrap(), epoch_id()) -> - {ok, [file_info()]} | {error, term()}. +-spec list_files(port_wrap(), machi_dt:epoch_id()) -> + {ok, [machi_dt:file_info()]} | {error, term()}. list_files(Sock, EpochID) -> list2(Sock, EpochID). %% @doc Fetch the list of all files on the remote FLU. --spec list_files(inet_host(), inet_port(), epoch_id()) -> - {ok, [file_info()]} | {error, term()}. +-spec list_files(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id()) -> + {ok, [machi_dt:file_info()]} | {error, term()}. list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try @@ -238,15 +213,15 @@ list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) -> %% @doc Fetch the wedge status from the remote FLU. -spec wedge_status(port_wrap()) -> - {ok, {boolean(), pv1_epoch()}} | {error, term()}. + {ok, {boolean(), machi_dt:epoch_id()}} | {error, term()}. wedge_status(Sock) -> wedge_status2(Sock). %% @doc Fetch the wedge status from the remote FLU. --spec wedge_status(inet_host(), inet_port()) -> - {ok, {boolean(), pv1_epoch()}} | {error, term()}. +-spec wedge_status(machi_dt:inet_host(), machi_dt:inet_port()) -> + {ok, {boolean(), machi_dt:epoch_id()}} | {error, term()}. wedge_status(Host, TcpPort) when is_integer(TcpPort) -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try @@ -257,16 +232,16 @@ wedge_status(Host, TcpPort) when is_integer(TcpPort) -> %% @doc Get the latest epoch number + checksum from the FLU's projection store. --spec get_latest_epochid(port_wrap(), projection_type()) -> - {ok, epoch_id()} | {error, term()}. +-spec get_latest_epochid(port_wrap(), machi_dt:projection_type()) -> + {ok, machi_dt:epoch_id()} | {error, term()}. get_latest_epochid(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> get_latest_epochid2(Sock, ProjType). %% @doc Get the latest epoch number + checksum from the FLU's projection store. --spec get_latest_epochid(inet_host(), inet_port(), projection_type()) -> - {ok, epoch_id()} | {error, term()}. +-spec get_latest_epochid(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:projection_type()) -> + {ok, machi_dt:epoch_id()} | {error, term()}. get_latest_epochid(Host, TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), @@ -278,17 +253,17 @@ get_latest_epochid(Host, TcpPort, ProjType) %% @doc Get the latest projection from the FLU's projection store for `ProjType' --spec read_latest_projection(port_wrap(), projection_type()) -> - {ok, projection()} | {error, not_written} | {error, term()}. +-spec read_latest_projection(port_wrap(), machi_dt:projection_type()) -> + {ok, machi_dt:projection()} | {error, not_written} | {error, term()}. read_latest_projection(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> read_latest_projection2(Sock, ProjType). %% @doc Get the latest projection from the FLU's projection store for `ProjType' --spec read_latest_projection(inet_host(), inet_port(), - projection_type()) -> - {ok, projection()} | {error, not_written} | {error, term()}. +-spec read_latest_projection(machi_dt:inet_host(), machi_dt:inet_port(), + machi_dt:projection_type()) -> + {ok, machi_dt:projection()} | {error, not_written} | {error, term()}. read_latest_projection(Host, TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), @@ -300,17 +275,17 @@ read_latest_projection(Host, TcpPort, ProjType) %% @doc Read a projection `Proj' of type `ProjType'. --spec read_projection(port_wrap(), projection_type(), epoch_num()) -> - {ok, projection()} | {error, not_written} | {error, term()}. +-spec read_projection(port_wrap(), machi_dt:projection_type(), machi_dt:epoch_num()) -> + {ok, machi_dt:projection()} | {error, not_written} | {error, term()}. read_projection(Sock, ProjType, Epoch) when ProjType == 'public' orelse ProjType == 'private' -> read_projection2(Sock, ProjType, Epoch). %% @doc Read a projection `Proj' of type `ProjType'. --spec read_projection(inet_host(), inet_port(), - projection_type(), epoch_num()) -> - {ok, projection()} | {error, not_written} | {error, term()}. +-spec read_projection(machi_dt:inet_host(), machi_dt:inet_port(), + machi_dt:projection_type(), machi_dt:epoch_num()) -> + {ok, machi_dt:projection()} | {error, not_written} | {error, term()}. read_projection(Host, TcpPort, ProjType, Epoch) when ProjType == 'public' orelse ProjType == 'private' -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), @@ -322,7 +297,7 @@ read_projection(Host, TcpPort, ProjType, Epoch) %% @doc Write a projection `Proj' of type `ProjType'. --spec write_projection(port_wrap(), projection_type(), projection()) -> +-spec write_projection(port_wrap(), machi_dt:projection_type(), machi_dt:projection()) -> 'ok' | {error, 'written'} | {error, term()}. write_projection(Sock, ProjType, Proj) when ProjType == 'public' orelse ProjType == 'private', @@ -331,8 +306,8 @@ write_projection(Sock, ProjType, Proj) %% @doc Write a projection `Proj' of type `ProjType'. --spec write_projection(inet_host(), inet_port(), - projection_type(), projection()) -> +-spec write_projection(machi_dt:inet_host(), machi_dt:inet_port(), + machi_dt:projection_type(), machi_dt:projection()) -> 'ok' | {error, 'written'} | {error, term()}. write_projection(Host, TcpPort, ProjType, Proj) when ProjType == 'public' orelse ProjType == 'private', @@ -346,17 +321,16 @@ write_projection(Host, TcpPort, ProjType, Proj) %% @doc Get all projections from the FLU's projection store. --spec get_all_projections(port_wrap(), projection_type()) -> - {ok, [projection()]} | {error, term()}. +-spec get_all_projections(port_wrap(), machi_dt:projection_type()) -> + {ok, [machi_dt:projection()]} | {error, term()}. get_all_projections(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> get_all_projections2(Sock, ProjType). %% @doc Get all projections from the FLU's projection store. --spec get_all_projections(inet_host(), inet_port(), - projection_type()) -> - {ok, [projection()]} | {error, term()}. +-spec get_all_projections(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:projection_type()) -> + {ok, [machi_dt:projection()]} | {error, term()}. get_all_projections(Host, TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), @@ -368,7 +342,7 @@ get_all_projections(Host, TcpPort, ProjType) %% @doc Get all epoch numbers from the FLU's projection store. --spec list_all_projections(port_wrap(), projection_type()) -> +-spec list_all_projections(port_wrap(), machi_dt:projection_type()) -> {ok, [non_neg_integer()]} | {error, term()}. list_all_projections(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> @@ -376,8 +350,7 @@ list_all_projections(Sock, ProjType) %% @doc Get all epoch numbers from the FLU's projection store. --spec list_all_projections(inet_host(), inet_port(), - projection_type()) -> +-spec list_all_projections(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:projection_type()) -> {ok, [non_neg_integer()]} | {error, term()}. list_all_projections(Host, TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> @@ -419,8 +392,8 @@ disconnect(_) -> %% @doc Restricted API: Write a chunk of already-sequenced data to %% `File' at `Offset'. --spec write_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk()) -> - ok | {error, error_general()} | {error, term()}. +-spec write_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk()) -> + ok | {error, machi_dt:error_general()} | {error, term()}. write_chunk(Sock, EpochID, File, Offset, Chunk) when Offset >= ?MINIMUM_OFFSET -> write_chunk2(Sock, EpochID, File, Offset, Chunk). @@ -428,9 +401,9 @@ write_chunk(Sock, EpochID, File, Offset, Chunk) %% @doc Restricted API: Write a chunk of already-sequenced data to %% `File' at `Offset'. --spec write_chunk(inet_host(), inet_port(), - epoch_id(), file_name(), file_offset(), chunk()) -> - ok | {error, error_general()} | {error, term()}. +-spec write_chunk(machi_dt:inet_host(), machi_dt:inet_port(), + machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk()) -> + ok | {error, machi_dt:error_general()} | {error, term()}. write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk) when Offset >= ?MINIMUM_OFFSET -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), @@ -443,16 +416,16 @@ write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk) %% @doc Restricted API: Delete a file after it has been successfully %% migrated. --spec delete_migration(port_wrap(), epoch_id(), file_name()) -> - ok | {error, error_general() | 'no_such_file'} | {error, term()}. +-spec delete_migration(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name()) -> + ok | {error, machi_dt:error_general() | 'no_such_file'} | {error, term()}. delete_migration(Sock, EpochID, File) -> delete_migration2(Sock, EpochID, File). %% @doc Restricted API: Delete a file after it has been successfully %% migrated. --spec delete_migration(inet_host(), inet_port(), epoch_id(), file_name()) -> - ok | {error, error_general() | 'no_such_file'} | {error, term()}. +-spec delete_migration(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(), machi_dt:file_name()) -> + ok | {error, machi_dt:error_general() | 'no_such_file'} | {error, term()}. delete_migration(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try @@ -464,16 +437,16 @@ delete_migration(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> %% @doc Restricted API: Truncate a file after it has been successfully %% erasure coded. --spec trunc_hack(port_wrap(), epoch_id(), file_name()) -> - ok | {error, error_general() | 'no_such_file'} | {error, term()}. +-spec trunc_hack(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name()) -> + ok | {error, machi_dt:error_general() | 'no_such_file'} | {error, term()}. trunc_hack(Sock, EpochID, File) -> trunc_hack2(Sock, EpochID, File). %% @doc Restricted API: Truncate a file after it has been successfully %% erasure coded. --spec trunc_hack(inet_host(), inet_port(), epoch_id(), file_name()) -> - ok | {error, error_general() | 'no_such_file'} | {error, term()}. +-spec trunc_hack(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(), machi_dt:file_name()) -> + ok | {error, machi_dt:error_general() | 'no_such_file'} | {error, term()}. trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try diff --git a/src/machi_util.erl b/src/machi_util.erl index aae5963..2b8fb97 100644 --- a/src/machi_util.erl +++ b/src/machi_util.erl @@ -48,7 +48,7 @@ %% @doc Create a registered name atom for FLU sequencer internal %% rendezvous/message passing use. --spec make_regname(binary()|list()) -> +-spec make_regname(binary()|string()) -> atom(). make_regname(Prefix) when is_binary(Prefix) -> erlang:binary_to_atom(Prefix, latin1); @@ -231,13 +231,13 @@ make_tagged_csum(server_regen, SHA) -> %% @doc Log a verbose message. --spec verb(string()) -> term(). +-spec verb(string()) -> ok. verb(Fmt) -> verb(Fmt, []). %% @doc Log a verbose message. --spec verb(string(), list()) -> term(). +-spec verb(string(), list()) -> ok. verb(Fmt, Args) -> case application:get_env(kernel, verbose) of {ok, true} -> io:format(Fmt, Args); diff --git a/src/machi_yessir_client.erl b/src/machi_yessir_client.erl index c7e4872..d63aa2a 100644 --- a/src/machi_yessir_client.erl +++ b/src/machi_yessir_client.erl @@ -64,45 +64,15 @@ chunk_size }). --type chunk() :: chunk_bin() | {chunk_csum(), chunk_bin()}. --type chunk_bin() :: binary() | iolist(). % client can use either --type chunk_csum() :: binary(). % 1 byte tag, N-1 bytes checksum -%% -type chunk_summary() :: {file_offset(), chunk_size(), binary()}. --type chunk_s() :: binary(). % server always uses binary() --type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}. --type chunk_size() :: non_neg_integer(). --type error_general() :: 'bad_arg' | 'wedged'. --type epoch_csum() :: binary(). --type epoch_num() :: -1 | non_neg_integer(). --type epoch_id() :: {epoch_num(), epoch_csum()}. --type file_info() :: {file_size(), file_name_s()}. --type file_name() :: binary() | list(). --type file_name_s() :: binary(). % server reply --type file_offset() :: non_neg_integer(). --type file_size() :: non_neg_integer(). --type file_prefix() :: binary() | list(). --type inet_host() :: inet:ip_address() | inet:hostname(). --type inet_port() :: inet:port_number(). --type port_wrap() :: #yessir{}. % yessir non-standard! --type projection() :: #projection_v1{}. --type projection_type() :: 'public' | 'private'. - --export_type([epoch_id/0]). - %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix'. --spec append_chunk(port_wrap(), epoch_id(), file_prefix(), chunk()) -> - {ok, chunk_pos()} | {error, error_general()} | {error, term()}. append_chunk(Sock, EpochID, Prefix, Chunk) -> append_chunk_extra(Sock, EpochID, Prefix, Chunk, 0). %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix'. --spec append_chunk(inet_host(), inet_port(), - epoch_id(), file_prefix(), chunk()) -> - {ok, chunk_pos()} | {error, error_general()} | {error, term()}. append_chunk(_Host, _TcpPort, EpochID, Prefix, Chunk) -> Sock = connect(#p_srvr{proto_mod=?MODULE}), try @@ -119,8 +89,6 @@ append_chunk(_Host, _TcpPort, EpochID, Prefix, Chunk) -> %% be reserved by the file sequencer for later write(s) by the %% `write_chunk()' API. --spec append_chunk_extra(port_wrap(), epoch_id(), file_prefix(), chunk(), chunk_size()) -> - {ok, chunk_pos()}. %%%% | {error, error_general()} | {error, term()}. append_chunk_extra(#yessir{name=Name,start_bin=StartBin}, _EpochID, Prefix, Chunk, ChunkExtra) when is_integer(ChunkExtra), ChunkExtra >= 0 -> @@ -140,9 +108,6 @@ append_chunk_extra(#yessir{name=Name,start_bin=StartBin}, %% be reserved by the file sequencer for later write(s) by the %% `write_chunk()' API. --spec append_chunk_extra(inet_host(), inet_port(), - epoch_id(), file_prefix(), chunk(), chunk_size()) -> - {ok, chunk_pos()}. %%%% | {error, error_general()} | {error, term()}. append_chunk_extra(_Host, _TcpPort, EpochID, Prefix, Chunk, ChunkExtra) when is_integer(ChunkExtra), ChunkExtra >= 0 -> Sock = connect(#p_srvr{proto_mod=?MODULE}), @@ -154,10 +119,6 @@ append_chunk_extra(_Host, _TcpPort, EpochID, Prefix, Chunk, ChunkExtra) %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. --spec read_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk_size()) -> - {ok, chunk_s()} | - {error, error_general() | 'no_such_file' | 'partial_read'} | - {error, term()}. read_chunk(#yessir{name=Name}, _EpochID, File, Offset, Size) when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> case get({Name,offset,File}) of @@ -201,11 +162,6 @@ make_csum(Name,Size) -> %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. --spec read_chunk(inet_host(), inet_port(), epoch_id(), - file_name(), file_offset(), chunk_size()) -> - {ok, chunk_s()} | - {error, error_general() | 'no_such_file' | 'partial_read'} | - {error, term()}. read_chunk(_Host, _TcpPort, EpochID, File, Offset, Size) when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> Sock = connect(#p_srvr{proto_mod=?MODULE}), @@ -217,10 +173,6 @@ read_chunk(_Host, _TcpPort, EpochID, File, Offset, Size) %% @doc Fetch the list of chunk checksums for `File'. --spec checksum_list(port_wrap(), epoch_id(), file_name()) -> - {ok, [chunk_csum()]} | - {error, error_general() | 'no_such_file' | 'partial_read'} | - {error, term()}. checksum_list(#yessir{name=Name,chunk_size=ChunkSize}, _EpochID, File) -> case get({Name,offset,File}) of undefined -> @@ -234,9 +186,6 @@ checksum_list(#yessir{name=Name,chunk_size=ChunkSize}, _EpochID, File) -> %% @doc Fetch the list of chunk checksums for `File'. --spec checksum_list(inet_host(), inet_port(), epoch_id(), file_name()) -> - {ok, [chunk_csum()]} | - {error, error_general() | 'no_such_file'} | {error, term()}. checksum_list(_Host, _TcpPort, EpochID, File) -> Sock = connect(#p_srvr{proto_mod=?MODULE}), try @@ -247,16 +196,12 @@ checksum_list(_Host, _TcpPort, EpochID, File) -> %% @doc Fetch the list of all files on the remote FLU. --spec list_files(port_wrap(), epoch_id()) -> - {ok, [file_info()]} | {error, term()}. list_files(#yessir{name=Name}, _EpochID) -> Files = [{Offset, File} || {{N,offset,File}, Offset} <- get(), N == Name], {ok, Files}. %% @doc Fetch the list of all files on the remote FLU. --spec list_files(inet_host(), inet_port(), epoch_id()) -> - {ok, [file_info()]} | {error, term()}. list_files(_Host, _TcpPort, EpochID) -> Sock = connect(#p_srvr{proto_mod=?MODULE}), try @@ -267,16 +212,11 @@ list_files(_Host, _TcpPort, EpochID) -> %% @doc Fetch the wedge status from the remote FLU. --spec wedge_status(port_wrap()) -> - {ok, {boolean(), pv1_epoch()}} | {error, term()}. - wedge_status(_Sock) -> {ok, {false, ?DUMMY_PV1_EPOCH}}. %% @doc Fetch the wedge status from the remote FLU. --spec wedge_status(inet_host(), inet_port()) -> - {ok, {boolean(), pv1_epoch()}} | {error, term()}. wedge_status(_Host, _TcpPort) -> Sock = connect(#p_srvr{proto_mod=?MODULE}), try @@ -287,8 +227,6 @@ wedge_status(_Host, _TcpPort) -> %% @doc Get the latest epoch number + checksum from the FLU's projection store. --spec get_latest_epoch(port_wrap(), projection_type()) -> - {ok, epoch_id()} | {error, term()}. get_latest_epoch(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> case read_latest_projection(Sock, ProjType) of @@ -298,9 +236,6 @@ get_latest_epoch(Sock, ProjType) %% @doc Get the latest epoch number + checksum from the FLU's projection store. --spec get_latest_epoch(inet_host(), inet_port(), - projection_type()) -> - {ok, epoch_id()} | {error, term()}. get_latest_epoch(_Host, _TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> Sock = connect(#p_srvr{proto_mod=?MODULE}), @@ -312,8 +247,6 @@ get_latest_epoch(_Host, _TcpPort, ProjType) %% @doc Get the latest projection from the FLU's projection store for `ProjType' --spec read_latest_projection(port_wrap(), projection_type()) -> - {ok, projection()} | {error, not_written} | {error, term()}. read_latest_projection(#yessir{name=Name}, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> Ps = [P || {{N,proj,PT,_Epoch}, P} <- get(), N == Name, PT == ProjType], @@ -330,9 +263,6 @@ read_latest_projection(#yessir{name=Name}, ProjType) %% @doc Get the latest projection from the FLU's projection store for `ProjType' --spec read_latest_projection(inet_host(), inet_port(), - projection_type()) -> - {ok, projection()} | {error, not_written} | {error, term()}. read_latest_projection(_Host, _TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> Sock = connect(#p_srvr{proto_mod=?MODULE}), @@ -344,8 +274,6 @@ read_latest_projection(_Host, _TcpPort, ProjType) %% @doc Read a projection `Proj' of type `ProjType'. --spec read_projection(port_wrap(), projection_type(), epoch_num()) -> - {ok, projection()} | {error, not_written} | {error, term()}. read_projection(#yessir{name=Name}, ProjType, Epoch) when ProjType == 'public' orelse ProjType == 'private' -> case get({Name,proj,ProjType,Epoch}) of @@ -357,9 +285,6 @@ read_projection(#yessir{name=Name}, ProjType, Epoch) %% @doc Read a projection `Proj' of type `ProjType'. --spec read_projection(inet_host(), inet_port(), - projection_type(), epoch_num()) -> - {ok, projection()} | {error, written} | {error, term()}. read_projection(_Host, _TcpPort, ProjType, Epoch) when ProjType == 'public' orelse ProjType == 'private' -> Sock = connect(#p_srvr{proto_mod=?MODULE}), @@ -371,8 +296,6 @@ read_projection(_Host, _TcpPort, ProjType, Epoch) %% @doc Write a projection `Proj' of type `ProjType'. --spec write_projection(port_wrap(), projection_type(), projection()) -> - 'ok' | {error, written} | {error, term()}. write_projection(#yessir{name=Name}=Sock, ProjType, Proj) when ProjType == 'public' orelse ProjType == 'private', is_record(Proj, projection_v1) -> @@ -387,9 +310,6 @@ write_projection(#yessir{name=Name}=Sock, ProjType, Proj) %% @doc Write a projection `Proj' of type `ProjType'. --spec write_projection(inet_host(), inet_port(), - projection_type(), projection()) -> - 'ok' | {error, written} | {error, term()}. write_projection(_Host, _TcpPort, ProjType, Proj) when ProjType == 'public' orelse ProjType == 'private', is_record(Proj, projection_v1) -> @@ -402,8 +322,6 @@ write_projection(_Host, _TcpPort, ProjType, Proj) %% @doc Get all projections from the FLU's projection store. --spec get_all_projections(port_wrap(), projection_type()) -> - {ok, [projection()]} | {error, term()}. get_all_projections(#yessir{name=Name}, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> Ps = [Proj || {{N,proj,PT,_}, Proj} <- get(), N == Name, PT == ProjType], @@ -411,9 +329,6 @@ get_all_projections(#yessir{name=Name}, ProjType) %% @doc Get all projections from the FLU's projection store. --spec get_all_projections(inet_host(), inet_port(), - projection_type()) -> - {ok, [projection()]} | {error, term()}. get_all_projections(_Host, _TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> Sock = connect(#p_srvr{proto_mod=?MODULE}), @@ -425,8 +340,6 @@ get_all_projections(_Host, _TcpPort, ProjType) %% @doc Get all epoch numbers from the FLU's projection store. --spec list_all_projections(port_wrap(), projection_type()) -> - {ok, [non_neg_integer()]} | {error, term()}. list_all_projections(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> case get_all_projections(Sock, ProjType) of @@ -436,9 +349,6 @@ list_all_projections(Sock, ProjType) %% @doc Get all epoch numbers from the FLU's projection store. --spec list_all_projections(inet_host(), inet_port(), - projection_type()) -> - {ok, [non_neg_integer()]} | {error, term()}. list_all_projections(_Host, _TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> Sock = connect(#p_srvr{proto_mod=?MODULE}), @@ -450,8 +360,6 @@ list_all_projections(_Host, _TcpPort, ProjType) %% @doc Quit & close the connection to remote FLU. --spec quit(port_wrap()) -> - ok. quit(_) -> ok. @@ -460,8 +368,6 @@ quit(_) -> %% @doc Restricted API: Write a chunk of already-sequenced data to %% `File' at `Offset'. --spec write_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk()) -> - ok | {error, error_general()} | {error, term()}. write_chunk(#yessir{name=Name}, _EpochID, File, Offset, Chunk) when Offset >= ?MINIMUM_OFFSET -> Pos = case get({Name,offset,File}) of @@ -474,9 +380,6 @@ write_chunk(#yessir{name=Name}, _EpochID, File, Offset, Chunk) %% @doc Restricted API: Write a chunk of already-sequenced data to %% `File' at `Offset'. --spec write_chunk(inet_host(), inet_port(), - epoch_id(), file_name(), file_offset(), chunk()) -> - ok | {error, error_general()} | {error, term()}. write_chunk(_Host, _TcpPort, EpochID, File, Offset, Chunk) when Offset >= ?MINIMUM_OFFSET -> Sock = connect(#p_srvr{proto_mod=?MODULE}), @@ -489,8 +392,6 @@ write_chunk(_Host, _TcpPort, EpochID, File, Offset, Chunk) %% @doc Restricted API: Delete a file after it has been successfully %% migrated. --spec delete_migration(port_wrap(), epoch_id(), file_name()) -> - ok | {error, error_general() | 'no_such_file'} | {error, term()}. delete_migration(#yessir{name=Name}, _EpochID, File) -> case get({Name,offset,File}) of undefined -> @@ -503,8 +404,6 @@ delete_migration(#yessir{name=Name}, _EpochID, File) -> %% @doc Restricted API: Delete a file after it has been successfully %% migrated. --spec delete_migration(inet_host(), inet_port(), epoch_id(), file_name()) -> - ok | {error, error_general() | 'no_such_file'} | {error, term()}. delete_migration(_Host, _TcpPort, EpochID, File) -> Sock = connect(#p_srvr{proto_mod=?MODULE}), try @@ -516,16 +415,12 @@ delete_migration(_Host, _TcpPort, EpochID, File) -> %% @doc Restricted API: Truncate a file after it has been successfully %% erasure coded. --spec trunc_hack(port_wrap(), epoch_id(), file_name()) -> - ok | {error, error_general() | 'no_such_file'} | {error, term()}. trunc_hack(#yessir{name=Name}, _EpochID, File) -> put({Name,offset,File}, ?MINIMUM_OFFSET). %% @doc Restricted API: Truncate a file after it has been successfully %% erasure coded. --spec trunc_hack(inet_host(), inet_port(), epoch_id(), file_name()) -> - ok | {error, error_general() | 'no_such_file'} | {error, term()}. trunc_hack(_Host, _TcpPort, EpochID, File) -> Sock = connect(#p_srvr{proto_mod=?MODULE}), try diff --git a/test/machi_cr_client_test.erl b/test/machi_cr_client_test.erl index 6466294..e6db150 100644 --- a/test/machi_cr_client_test.erl +++ b/test/machi_cr_client_test.erl @@ -135,6 +135,29 @@ smoke_test2() -> %% Exactly one file right now {ok, [_]} = machi_cr_client:list_files(C1), + %% Go back and test append_chunk_extra() and write_chunk() + Chunk10 = <<"It's a different chunk!">>, + Size10 = byte_size(Chunk10), + Extra10 = 5, + {ok, {Off10,Size10,File10}} = + machi_cr_client:append_chunk_extra(C1, Prefix, Chunk10, + Extra10 * Size10), + {ok, Chunk10} = machi_cr_client:read_chunk(C1, File10, Off10, Size10), + [begin + Offx = Off10 + (Seq * Size10), + %% TODO: uncomment written/not_written enforcement is available. + %% {error,not_written} = machi_cr_client:read_chunk(C1, File10, + %% Offx, Size10), + {ok, {Offx,Size10,File10}} = + machi_cr_client:write_chunk(C1, File10, Offx, Chunk10), + {ok, Chunk10} = machi_cr_client:read_chunk(C1, File10, Offx, + Size10) + end || Seq <- lists:seq(1, Extra10)], + {ok, {Off11,Size11,File11}} = + machi_cr_client:append_chunk(C1, Prefix, Chunk10), + %% Double-check that our reserved extra bytes were really honored! + true = (Off11 > (Off10 + (Extra10 * Size10))), + ok after error_logger:tty(true), diff --git a/test/machi_pb_cr_client_test.erl b/test/machi_pb_cr_client_test.erl new file mode 100644 index 0000000..483ab9d --- /dev/null +++ b/test/machi_pb_cr_client_test.erl @@ -0,0 +1,79 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(machi_pb_cr_client_test). +-compile(export_all). + +-ifdef(TEST). +-ifndef(PULSE). + +-include("machi_pb.hrl"). +-include("machi_projection.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(C, machi_pb_cr_client). + +smoke_test_() -> + {timeout, 5*60, fun() -> smoke_test2() end}. + +smoke_test2() -> + Port = 5720, + Ps = [{a,#p_srvr{name=a, address="localhost", port=Port, props="./data.a"}} + ], + + [os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps], + {ok, SupPid} = machi_flu_sup:start_link(), + try + [begin + #p_srvr{name=Name, port=Port, props=Dir} = P, + {ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, []) + end || {_,P} <- Ps], + [machi_chain_manager1:test_react_to_env(a_chmgr) || _ <-lists:seq(1,5)], + + {_, P_a} = hd(Ps), + {ok, Sock} = gen_tcp:connect(P_a#p_srvr.address, P_a#p_srvr.port, + [{packet, line}, binary, {active, false}]), + try + Prefix = <<"pre">>, + Chunk1 = <<"yochunk">>, + ok = gen_tcp:send(Sock, <<"PROTOCOL-BUFFERS\n">>), + {ok, <<"OK\n">>} = gen_tcp:recv(Sock, 0), + ok = inet:setopts(Sock, [{packet,4}]), + R1a = #mpb_request{req_id= <<0>>, + echo=#mpb_echoreq{message = <<"Hello, world!">>}}, + Bin1a = machi_pb:encode_mpb_request(R1a), + ok = gen_tcp:send(Sock, Bin1a), + {ok, Bin1B} = gen_tcp:recv(Sock, 0), + R1b = machi_pb:decode_mpb_response(Bin1B), + true = is_record(R1b, mpb_response), + + ok + after + (catch gen_tcp:close(Sock)) + end + after + exit(SupPid, normal), + [os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps], + machi_util:wait_for_death(SupPid, 100), + ok + end. + +-endif. % !PULSE +-endif. % TEST diff --git a/test/machi_pb_test.erl b/test/machi_pb_test.erl new file mode 100644 index 0000000..41a224f --- /dev/null +++ b/test/machi_pb_test.erl @@ -0,0 +1,67 @@ +%% ------------------------------------------------------------------- +%% +%% Machi: a small village of replicated files +%% +%% Copyright (c) 2014-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(machi_pb_test). + +-include("machi_pb.hrl"). + +-compile(export_all). + +-ifdef(TEST). +-ifndef(PULSE). + +-include_lib("eunit/include/eunit.hrl"). + +%% We don't expect any problems with these functions: nearly all +%% code executed is compiled from the PB definition file(s) and so +%% ought to be bug-free. Errors are likely to be caused by +%% changes in the spec but without corresponding changes to +%% message types/names here. + +smoke_requests_test() -> + Echo0 = #mpb_request{req_id= <<"x">>, + echo=#mpb_echoreq{}}, + Echo0 = encdec_request(Echo0), + Echo1 = #mpb_request{req_id= <<"x">>, + echo=#mpb_echoreq{message="Yo!"}}, + Echo1 = encdec_request(Echo1), + + ok. + +smoke_responses_test() -> + R1 = #mpb_response{req_id= <<"x">>, + generic=#mpb_errorresp{code=7, + msg="foo", + extra= <<"bar">>}}, + R1 = encdec_response(R1), + + ok. + +encdec_request(M) -> + machi_pb:decode_mpb_request( + list_to_binary(machi_pb:encode_mpb_request(M))). + +encdec_response(M) -> + machi_pb:decode_mpb_response( + list_to_binary(machi_pb:encode_mpb_response(M))). + +-endif. % !PULSE +-endif. % TEST