Merge branch 'slf/pb-api-experiment1'

This commit is contained in:
Scott Lystig Fritchie 2015-06-21 15:10:54 +09:00
commit b65293f391
17 changed files with 617 additions and 222 deletions

4
.gitignore vendored
View file

@ -1,4 +1,5 @@
prototype/chain-manager/patch.* prototype/chain-manager/patch.*
.eqc-info
.eunit .eunit
deps deps
*.plt *.plt
@ -6,3 +7,6 @@ erl_crash.dump
.concrete/DEV_MODE .concrete/DEV_MODE
.rebar .rebar
edoc edoc
# PB artifacts for Erlang
include/machi_pb.hrl

View file

@ -37,13 +37,18 @@ PLT = $(HOME)/.machi_dialyzer_plt
build_plt: deps compile build_plt: deps compile
dialyzer --build_plt --output_plt $(PLT) --apps $(APPS) deps/*/ebin dialyzer --build_plt --output_plt $(PLT) --apps $(APPS) deps/*/ebin
DIALYZER_DEP_APPS = ebin/machi_pb.beam deps/protobuffs/ebin
DIALYZER_FLAGS = -Wno_return -Wrace_conditions -Wunderspecs
dialyzer: deps compile dialyzer: deps compile
dialyzer -Wno_return --plt $(PLT) ebin dialyzer $(DIALYZER_FLAGS) --plt $(PLT) ebin $(DIALYZER_DEP_APPS) | \
egrep -v -f ./filter-dialyzer-dep-warnings
dialyzer-test: deps compile dialyzer-test: deps compile
echo Force rebar to recompile .eunit dir w/o running tests > /dev/null echo Force rebar to recompile .eunit dir w/o running tests > /dev/null
rebar skip_deps=true eunit suite=lamport_clock rebar skip_deps=true eunit suite=lamport_clock
dialyzer -Wno_return --plt $(PLT) .eunit dialyzer $(DIALYZER_FLAGS) --plt $(PLT) .eunit $(DIALYZER_DEP_APPS) | \
egrep -v -f ./filter-dialyzer-dep-warnings
clean_plt: clean_plt:
rm $(PLT) rm $(PLT)

View file

@ -0,0 +1,12 @@
####################### patterns for general errors in dep modules:
^protobuffs\.erl:
^protobuffs_[a-z_]*\.erl:
^leexinc\.hrl:[0-9][0-9]*:
^machi_chain_manager1.erl:[0-9][0-9]*: Guard test RetrospectiveP::'false' =:= 'true' can never succeed
^machi_pb\.erl:[0-9][0-9]*:
^pokemon_pb\.erl:[0-9][0-9]*:
####################### patterns for unknown functions:
^ basho_bench_config:get/2
^ erl_prettypr:format/1
^ erl_syntax:form_list/1
^ machi_partition_simulator:get/1

BIN
rebar vendored

Binary file not shown.

View file

@ -5,6 +5,7 @@
{edoc_opts, [{dir, "./edoc"}]}. {edoc_opts, [{dir, "./edoc"}]}.
{deps, [ {deps, [
{lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "2.0.1"}}} {lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "2.0.1"}}},
{protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", {tag, "0.8.1p4"}}}
]}. ]}.

185
src/machi.proto Normal file
View file

@ -0,0 +1,185 @@
/* -------------------------------------------------------------------
**
** machi.proto: Protocol Buffers definition for Machi
**
** Copyright (c) 2014-2015 Basho Technologies, Inc. All Rights Reserved.
**
** This file is provided to you under the Apache License,
** Version 2.0 (the "License"); you may not use this file
** except in compliance with the License. You may obtain
** a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing,
** software distributed under the License is distributed on an
** "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
** KIND, either express or implied. See the License for the
** specific language governing permissions and limitations
** under the License.
**
** -------------------------------------------------------------------
*/
/*
** Revision: 0.1
*/
// Java package specifiers
option java_package = "com.basho.machi.protobuf";
option java_outer_classname = "MachiPB";
//////////////////////////////////////////
//
// enums
//
//////////////////////////////////////////
enum Mpb_GeneralStatusCode {
OK = 0;
BAD_ARG = 1;
WEDGED = 2;
BAD_CHECKSUM = 3;
PARTITION = 4;
}
// Must match with machi.hrl's values!
enum Mpb_CSumType {
CSUM_TAG_NONE = 0;
CSUM_TAG_CLIENT_GEN = 1;
CSUM_TAG_SERVER_GEN = 2;
CSUM_TAG_SERVER_REGEN = 3;
}
//////////////////////////////////////////
//
// basic data types
//
//////////////////////////////////////////
// chunk_pos() type
message Mpb_ChunkPos {
required uint64 offset = 1;
required uint64 chunk_size = 2;
required string file_name = 3;
}
// chunk_csum() type
message Mpb_ChunkCSum {
required Mpb_CSumType type = 1;
required bytes csum = 2;
}
// epoch_id() type
message Mpb_EpochId {
required uint32 epoch_num = 1;
required Mpb_ChunkCSum epoch_csum = 2;
}
//////////////////////////////////////////
//
// requests & responses
//
//////////////////////////////////////////
// Error response - may be generated for any Req
message Mpb_ErrorResp {
// Free-form (depends on server, which is probably a bad idea TODO)
required int32 code = 1;
required string msg = 2;
optional bytes extra = 3;
}
// ping() request & response
message Mpb_EchoReq {
optional string message = 1;
}
message Mpb_EchoResp {
optional string message = 1;
}
// Authentication request & response
message Mpb_AuthReq {
required bytes user = 1;
required bytes password = 2;
}
message Mpb_AuthResp {
required uint32 code = 1;
// TODO: not implemented yet
}
// append_chunk() request & response
message Mpb_AppendChunkReq {
required string prefix = 1;
optional bytes placement_key = 2;
required bytes chunk = 3;
optional uint32 chunk_extra = 4;
}
message Mpb_AppendChunkResp {
required Mpb_GeneralStatusCode status = 1;
// If OK, then chunk_pos is defined.
optional Mpb_ChunkPos chunk_pos = 2;
}
// write_chunk() request & response
message Mpb_WriteChunkReq {
required string file = 1;
required uint64 offset = 2;
required bytes chunk = 3;
}
message Mpb_WriteChunkResp {
required Mpb_GeneralStatusCode status = 1;
// If OK, then chunk_pos is defined.
optional Mpb_ChunkPos chunk_pos = 2;
}
//////////////////////////////////////////
//
// request & response wrapper
//
//////////////////////////////////////////
message Mpb_Request {
// TODO: If we wish to support pipelined requests sometime in the
// future, this is the placeholder to do it.
required bytes req_id = 1;
// The client should only define one request message. If the client
// includes multiple requests here, the server may pick/choose an
// arbitrary one.
// NOTE: The erlang protobuffs compiler doesn't support 'oneof'.
// But 'oneof' appears to be a very tiny memory optimization
// that not all languages might care about? (Erlang doesn't)
optional Mpb_EchoReq echo = 10;
optional Mpb_AuthReq auth = 11;
optional Mpb_AppendChunkReq append_chunk = 12;
optional Mpb_WriteChunkReq write_chunk = 13;
}
message Mpb_Response {
// TODO: If we wish to support pipelined requests sometime in the
// future, this is the placeholder to do it.
required bytes req_id = 1;
// The server will define only one of the optional responses below.
// Generic error response, typically used when something quite
// bad/unexpected happened within the server.
// Clients should always check this response and, if defined,
// ignroe any request-specific response at codes 10+.
optional Mpb_ErrorResp generic = 2;
// Specific responses.
optional Mpb_EchoResp echo = 10;
optional Mpb_AuthResp auth = 11;
optional Mpb_AppendChunkResp append_chunk = 12;
optional Mpb_WriteChunkResp write_chunk = 13;
}

View file

@ -22,10 +22,6 @@
-module(machi_admin_util). -module(machi_admin_util).
%% TODO Move these types to a common header file? (also machi_flu1_client.erl?)
-type inet_host() :: inet:ip_address() | inet:hostname().
-type inet_port() :: inet:port_number().
-export([ -export([
verify_file_checksums_local/3, verify_file_checksums_local/4, verify_file_checksums_local/3, verify_file_checksums_local/4,
verify_file_checksums_remote/3, verify_file_checksums_remote/4 verify_file_checksums_remote/3, verify_file_checksums_remote/4
@ -37,13 +33,13 @@
-define(FLU_C, machi_flu1_client). -define(FLU_C, machi_flu1_client).
-spec verify_file_checksums_local(port(), machi_flu1_client:epoch_id(), binary()|list()) -> -spec verify_file_checksums_local(port(), machi_dt:epoch_id(), binary()|list()) ->
{ok, [tuple()]} | {error, term()}. {ok, [tuple()]} | {error, term()}.
verify_file_checksums_local(Sock1, EpochID, Path) when is_port(Sock1) -> verify_file_checksums_local(Sock1, EpochID, Path) when is_port(Sock1) ->
verify_file_checksums_local2(Sock1, EpochID, Path). verify_file_checksums_local2(Sock1, EpochID, Path).
-spec verify_file_checksums_local(inet_host(), inet_port(), -spec verify_file_checksums_local(machi_dt:inet_host(), machi_dt:inet_port(),
machi_flu1_client:epoch_id(), binary()|list()) -> machi_dt:epoch_id(), binary()|list()) ->
{ok, [tuple()]} | {error, term()}. {ok, [tuple()]} | {error, term()}.
verify_file_checksums_local(Host, TcpPort, EpochID, Path) -> verify_file_checksums_local(Host, TcpPort, EpochID, Path) ->
Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}), Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}),
@ -53,13 +49,13 @@ verify_file_checksums_local(Host, TcpPort, EpochID, Path) ->
catch ?FLU_C:disconnect(Sock1) catch ?FLU_C:disconnect(Sock1)
end. end.
-spec verify_file_checksums_remote(port(), machi_flu1_client:epoch_id(), binary()|list()) -> -spec verify_file_checksums_remote(port(), machi_dt:epoch_id(), binary()|list()) ->
{ok, [tuple()]} | {error, term()}. {ok, [tuple()]} | {error, term()}.
verify_file_checksums_remote(Sock1, EpochID, File) when is_port(Sock1) -> verify_file_checksums_remote(Sock1, EpochID, File) when is_port(Sock1) ->
verify_file_checksums_remote2(Sock1, EpochID, File). verify_file_checksums_remote2(Sock1, EpochID, File).
-spec verify_file_checksums_remote(inet_host(), inet_port(), -spec verify_file_checksums_remote(machi_dt:inet_host(), machi_dt:inet_port(),
machi_flu1_client:epoch_id(), binary()|list()) -> machi_dt:epoch_id(), binary()|list()) ->
{ok, [tuple()]} | {error, term()}. {ok, [tuple()]} | {error, term()}.
verify_file_checksums_remote(Host, TcpPort, EpochID, File) -> verify_file_checksums_remote(Host, TcpPort, EpochID, File) ->
Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}), Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}),

View file

@ -26,7 +26,7 @@
%% use basho_bench to measure its performance under a certain %% use basho_bench to measure its performance under a certain
%% workload. Machi is a bit different than most KV stores in that the %% workload. Machi is a bit different than most KV stores in that the
%% client has no direct control over the keys -- Machi servers always %% client has no direct control over the keys -- Machi servers always
%% assign the keys. The schemes typically used by basho_bench & YCSB %% assign the keys. The schemes typically used by basho_bench & YCSB
%% to use/mimic key naming conventions used internally ... are %% to use/mimic key naming conventions used internally ... are
%% difficult to adapt to Machi. %% difficult to adapt to Machi.
%% %%
@ -50,7 +50,7 @@
%% %%
%% TODO: As an alternate idea, if we know that the chunks written are %% TODO: As an alternate idea, if we know that the chunks written are
%% always the same size, and if we don't care about CRC checking, then %% always the same size, and if we don't care about CRC checking, then
%% all we need to know are the file names & file sizes on the server: %% all we need to know are the file names & file sizes on the server:
%% we can then pick any valid offset within that file. That would %% we can then pick any valid offset within that file. That would
%% certainly be more scalable than the zillion-row-ETS-table, which is %% certainly be more scalable than the zillion-row-ETS-table, which is
%% definitely RAM-hungry. %% definitely RAM-hungry.
@ -67,9 +67,9 @@
-define(ETS_TAB, machi_keys). -define(ETS_TAB, machi_keys).
-define(THE_TIMEOUT, 60*1000). -define(THE_TIMEOUT, 60*1000).
-define(INFO(Str, Args), lager:info(Str, Args)). -define(INFO(Str, Args), (_ = lager:info(Str, Args))).
-define(WARN(Str, Args), lager:warning(Str, Args)). -define(WARN(Str, Args), (_ = lager:warning(Str, Args))).
-define(ERROR(Str, Args), lager:error(Str, Args)). -define(ERROR(Str, Args), (_ = lager:error(Str, Args))).
new(Id) -> new(Id) ->
Ps = find_server_info(Id), Ps = find_server_info(Id),

View file

@ -119,6 +119,7 @@
%% File API %% File API
append_chunk/3, append_chunk/4, append_chunk/3, append_chunk/4,
append_chunk_extra/4, append_chunk_extra/5, append_chunk_extra/4, append_chunk_extra/5,
write_chunk/4, write_chunk/5,
read_chunk/4, read_chunk/5, read_chunk/4, read_chunk/5,
checksum_list/2, checksum_list/3, checksum_list/2, checksum_list/3,
list_files/1, list_files/2, list_files/1, list_files/2,
@ -180,6 +181,19 @@ append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra, Timeout) ->
Chunk, ChunkExtra}}, Chunk, ChunkExtra}},
Timeout). Timeout).
%% @doc Write a chunk of data (that has already been
%% allocated/sequenced by an earlier append_chunk_extra() call) to
%% `File' at `Offset'.
write_chunk(PidSpec, File, Offset, Chunk) ->
write_chunk(PidSpec, File, Offset, Chunk, ?DEFAULT_TIMEOUT).
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
write_chunk(PidSpec, File, Offset, Chunk, Timeout) ->
gen_server:call(PidSpec, {req, {write_chunk, File, Offset, Chunk}},
Timeout).
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'. %% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
read_chunk(PidSpec, File, Offset, Size) -> read_chunk(PidSpec, File, Offset, Size) ->
@ -252,6 +266,8 @@ code_change(_OldVsn, S, _Extra) ->
handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra}, _From, S) -> handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra}, _From, S) ->
do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), S); do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), S);
handle_call2({write_chunk, File, Offset, Chunk}, _From, S) ->
do_write_head(File, Offset, Chunk, 0, os:timestamp(), S);
handle_call2({read_chunk, File, Offset, Size}, _From, S) -> handle_call2({read_chunk, File, Offset, Size}, _From, S) ->
do_read_chunk(File, Offset, Size, 0, os:timestamp(), S); do_read_chunk(File, Offset, Size, 0, os:timestamp(), S);
handle_call2({checksum_list, File}, _From, S) -> handle_call2({checksum_list, File}, _From, S) ->
@ -338,8 +354,13 @@ do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
%% over with a new sequencer assignment, at %% over with a new sequencer assignment, at
%% the 2nd have of the impl (we have already %% the 2nd have of the impl (we have already
%% slept & refreshed the projection). %% slept & refreshed the projection).
do_append_head2(Prefix, Chunk, ChunkExtra, Depth,
STime, S2); if Prefix == undefined -> % atom! not binary()!!
{error, partition};
true ->
do_append_head2(Prefix, Chunk, ChunkExtra,
Depth, STime, S2)
end;
RestFLUs3 -> RestFLUs3 ->
do_append_midtail2(RestFLUs3, Prefix, File, Offset, do_append_midtail2(RestFLUs3, Prefix, File, Offset,
Chunk, ChunkExtra, Chunk, ChunkExtra,
@ -378,6 +399,51 @@ do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk,
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset}) exit({todo_should_never_happen,?MODULE,?LINE,File,Offset})
end. end.
do_write_head(File, Offset, Chunk, 0=Depth, STime, S) ->
do_write_head2(File, Offset, Chunk, Depth + 1, STime, S);
do_write_head(File, Offset, Chunk, Depth, STime, #state{proj=P}=S) ->
%% io:format(user, "head sleep1,", []),
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > ?MAX_RUNTIME ->
{reply, {error, partition}, S};
true ->
%% This is suboptimal for performance: there are some paths
%% through this point where our current projection is good
%% enough. But we're going to try to keep the code as simple
%% as we can for now.
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
case S2#state.proj of
P2 when P2 == undefined orelse
P2#projection_v1.upi == [] ->
do_write_head(File, Offset, Chunk, Depth + 1, STime, S2);
_ ->
do_write_head2(File, Offset, Chunk, Depth + 1, STime, S2)
end
end.
do_write_head2(File, Offset, Chunk, Depth, STime,
#state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) ->
[HeadFLU|RestFLUs] = mutation_flus(P),
Proxy = orddict:fetch(HeadFLU, PD),
case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of
ok ->
%% From this point onward, we use the same code & logic path as
%% append does.
do_append_midtail(RestFLUs, undefined, File, Offset, Chunk,
undefined, [HeadFLU], 0, STime, S);
{error, bad_checksum}=BadCS ->
{reply, BadCS, S};
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
do_write_head(File, Offset, Chunk, Depth, STime, S);
{error, written}=Err ->
Err;
{error, not_written} ->
exit({todo_should_never_happen,?MODULE,?LINE,
iolist_size(Chunk)})
end.
do_read_chunk(File, Offset, Size, 0=Depth, STime, do_read_chunk(File, Offset, Size, 0=Depth, STime,
#state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty
do_read_chunk2(File, Offset, Size, Depth + 1, STime, S); do_read_chunk2(File, Offset, Size, Depth + 1, STime, S);

70
src/machi_dt.erl Normal file
View file

@ -0,0 +1,70 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_dt).
-include("machi_projection.hrl").
-type chunk() :: chunk_bin() | {chunk_csum(), chunk_bin()}.
-type chunk_bin() :: binary() | iolist(). % client can use either
-type chunk_csum() :: binary(). % 1 byte tag, N-1 bytes checksum
-type chunk_summary() :: {file_offset(), chunk_size(), binary()}.
-type chunk_s() :: binary(). % server always uses binary()
-type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}.
-type chunk_size() :: non_neg_integer().
-type error_general() :: 'bad_arg' | 'wedged' | 'bad_checksum'.
-type epoch_csum() :: binary().
-type epoch_num() :: -1 | non_neg_integer().
-type epoch_id() :: {epoch_num(), epoch_csum()}.
-type file_info() :: {file_size(), file_name_s()}.
-type file_name() :: float().%%%binary() | list().
-type file_name_s() :: binary(). % server reply
-type file_offset() :: non_neg_integer().
-type file_size() :: non_neg_integer().
-type file_prefix() :: binary() | list().
-type inet_host() :: inet:ip_address() | inet:hostname().
-type inet_port() :: inet:port_number().
-type projection() :: #projection_v1{}.
-type projection_type() :: 'public' | 'private'.
-export_type([
chunk/0,
chunk_bin/0,
chunk_csum/0,
chunk_summary/0,
chunk_s/0,
chunk_pos/0,
chunk_size/0,
error_general/0,
epoch_csum/0,
epoch_num/0,
epoch_id/0,
file_info/0,
file_name/0,
file_name_s/0,
file_offset/0,
file_size/0,
file_prefix/0,
inet_host/0,
inet_port/0,
projection/0,
projection_type/0
]).

View file

@ -75,6 +75,7 @@
-include_lib("kernel/include/file.hrl"). -include_lib("kernel/include/file.hrl").
-include("machi.hrl"). -include("machi.hrl").
-include("machi_pb.hrl").
-include("machi_projection.hrl"). -include("machi_projection.hrl").
-define(SERVER_CMD_READ_TIMEOUT, 600*1000). -define(SERVER_CMD_READ_TIMEOUT, 600*1000).
@ -91,7 +92,7 @@
data_dir :: string(), data_dir :: string(),
wedged = true :: boolean(), wedged = true :: boolean(),
etstab :: ets:tid(), etstab :: ets:tid(),
epoch_id :: 'undefined' | pv1_epoch(), epoch_id :: 'undefined' | machi_dt:epoch_id(),
dbg_props = [] :: list(), % proplist dbg_props = [] :: list(), % proplist
props = [] :: list() % proplist props = [] :: list() % proplist
}). }).
@ -343,6 +344,10 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
http_server_hack(FluName, PutLine, Sock, S); http_server_hack(FluName, PutLine, Sock, S);
<<"GET ", _/binary>>=PutLine -> <<"GET ", _/binary>>=PutLine ->
http_server_hack(FluName, PutLine, Sock, S); http_server_hack(FluName, PutLine, Sock, S);
<<"PROTOCOL-BUFFERS\n">> ->
ok = gen_tcp:send(Sock, <<"OK\n">>),
ok = inet:setopts(Sock, [{packet, 4}]),
protocol_buffers_loop(Sock, S);
_ -> _ ->
machi_util:verb("Else Got: ~p\n", [Line]), machi_util:verb("Else Got: ~p\n", [Line]),
io:format(user, "TODO: Else Got: ~p\n", [Line]), io:format(user, "TODO: Else Got: ~p\n", [Line]),
@ -1009,6 +1014,20 @@ http_harvest_headers({ok, Hdr}, Sock, Acc) ->
http_harvest_headers(gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT), http_harvest_headers(gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT),
Sock, [Hdr|Acc]). Sock, [Hdr|Acc]).
protocol_buffers_loop(Sock, S) ->
case gen_tcp:recv(Sock, 0) of
{ok, _Bin} ->
R = #mpb_response{req_id= <<"not paying any attention">>,
generic=#mpb_errorresp{code=-6,
msg="not implemented"}},
Resp = machi_pb:encode_mpb_response(R),
ok = gen_tcp:send(Sock, Resp),
protocol_buffers_loop(Sock, S);
{error, _} ->
(catch gen_tcp:close(Sock)),
exit(normal)
end.
digest_header_goop([], G) -> digest_header_goop([], G) ->
G; G;
digest_header_goop([{http_header, _, 'Content-Length', _, Str}|T], G) -> digest_header_goop([{http_header, _, 'Content-Length', _, Str}|T], G) ->

View file

@ -85,47 +85,22 @@
trunc_hack/3, trunc_hack/4 trunc_hack/3, trunc_hack/4
]). ]).
%% TODO: Hrm, this kind of API use ... is it a bad idea? We really want to
%% encourage client-side checksums; thus it ought to be dead easy.
-type chunk() :: chunk_bin() | {chunk_csum(), chunk_bin()}.
-type chunk_bin() :: binary() | iolist(). % client can use either
-type chunk_csum() :: binary(). % 1 byte tag, N-1 bytes checksum
-type chunk_summary() :: {file_offset(), chunk_size(), binary()}.
-type chunk_s() :: binary(). % server always uses binary()
-type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}.
-type chunk_size() :: non_neg_integer().
-type error_general() :: 'bad_arg' | 'wedged' | 'bad_checksum'.
-type epoch_csum() :: binary().
-type epoch_num() :: -1 | non_neg_integer().
-type epoch_id() :: {epoch_num(), epoch_csum()}.
-type file_info() :: {file_size(), file_name_s()}.
-type file_name() :: binary() | list().
-type file_name_s() :: binary(). % server reply
-type file_offset() :: non_neg_integer().
-type file_size() :: non_neg_integer().
-type file_prefix() :: binary() | list().
-type inet_host() :: inet:ip_address() | inet:hostname().
-type inet_port() :: inet:port_number().
-type port_wrap() :: {w,atom(),term()}. -type port_wrap() :: {w,atom(),term()}.
-type projection() :: #projection_v1{}.
-type projection_type() :: 'public' | 'private'.
-export_type([epoch_id/0]).
%% @doc Append a chunk (binary- or iolist-style) of data to a file %% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'. %% with `Prefix'.
-spec append_chunk(port_wrap(), epoch_id(), file_prefix(), chunk()) -> -spec append_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_prefix(), machi_dt:chunk()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}. {ok, machi_dt:chunk_pos()} | {error, machi_dt:error_general()} | {error, term()}.
append_chunk(Sock, EpochID, Prefix, Chunk) -> append_chunk(Sock, EpochID, Prefix, Chunk) ->
append_chunk2(Sock, EpochID, Prefix, Chunk, 0). append_chunk2(Sock, EpochID, Prefix, Chunk, 0).
%% @doc Append a chunk (binary- or iolist-style) of data to a file %% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'. %% with `Prefix'.
-spec append_chunk(inet_host(), inet_port(), -spec append_chunk(machi_dt:inet_host(), machi_dt:inet_port(),
epoch_id(), file_prefix(), chunk()) -> machi_dt:epoch_id(), machi_dt:file_prefix(), machi_dt:chunk()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}. {ok, machi_dt:chunk_pos()} | {error, machi_dt:error_general()} | {error, term()}.
append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) -> append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try try
@ -142,8 +117,8 @@ append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) ->
%% be reserved by the file sequencer for later write(s) by the %% be reserved by the file sequencer for later write(s) by the
%% `write_chunk()' API. %% `write_chunk()' API.
-spec append_chunk_extra(port_wrap(), epoch_id(), file_prefix(), chunk(), chunk_size()) -> -spec append_chunk_extra(port_wrap(), machi_dt:epoch_id(), machi_dt:file_prefix(), machi_dt:chunk(), machi_dt:chunk_size()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}. {ok, machi_dt:chunk_pos()} | {error, machi_dt:error_general()} | {error, term()}.
append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra) append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra)
when is_integer(ChunkExtra), ChunkExtra >= 0 -> when is_integer(ChunkExtra), ChunkExtra >= 0 ->
append_chunk2(Sock, EpochID, Prefix, Chunk, ChunkExtra). append_chunk2(Sock, EpochID, Prefix, Chunk, ChunkExtra).
@ -156,9 +131,9 @@ append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra)
%% be reserved by the file sequencer for later write(s) by the %% be reserved by the file sequencer for later write(s) by the
%% `write_chunk()' API. %% `write_chunk()' API.
-spec append_chunk_extra(inet_host(), inet_port(), -spec append_chunk_extra(machi_dt:inet_host(), machi_dt:inet_port(),
epoch_id(), file_prefix(), chunk(), chunk_size()) -> machi_dt:epoch_id(), machi_dt:file_prefix(), machi_dt:chunk(), machi_dt:chunk_size()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}. {ok, machi_dt:chunk_pos()} | {error, machi_dt:error_general()} | {error, term()}.
append_chunk_extra(Host, TcpPort, EpochID, Prefix, Chunk, ChunkExtra) append_chunk_extra(Host, TcpPort, EpochID, Prefix, Chunk, ChunkExtra)
when is_integer(ChunkExtra), ChunkExtra >= 0 -> when is_integer(ChunkExtra), ChunkExtra >= 0 ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
@ -170,9 +145,9 @@ append_chunk_extra(Host, TcpPort, EpochID, Prefix, Chunk, ChunkExtra)
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'. %% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
-spec read_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk_size()) -> -spec read_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size()) ->
{ok, chunk_s()} | {ok, machi_dt:chunk_s()} |
{error, error_general() | 'not_written' | 'partial_read'} | {error, machi_dt:error_general() | 'not_written' | 'partial_read'} |
{error, term()}. {error, term()}.
read_chunk(Sock, EpochID, File, Offset, Size) read_chunk(Sock, EpochID, File, Offset, Size)
when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
@ -180,10 +155,10 @@ read_chunk(Sock, EpochID, File, Offset, Size)
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'. %% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
-spec read_chunk(inet_host(), inet_port(), epoch_id(), -spec read_chunk(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(),
file_name(), file_offset(), chunk_size()) -> machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size()) ->
{ok, chunk_s()} | {ok, machi_dt:chunk_s()} |
{error, error_general() | 'not_written' | 'partial_read'} | {error, machi_dt:error_general() | 'not_written' | 'partial_read'} |
{error, term()}. {error, term()}.
read_chunk(Host, TcpPort, EpochID, File, Offset, Size) read_chunk(Host, TcpPort, EpochID, File, Offset, Size)
when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
@ -196,18 +171,18 @@ read_chunk(Host, TcpPort, EpochID, File, Offset, Size)
%% @doc Fetch the list of chunk checksums for `File'. %% @doc Fetch the list of chunk checksums for `File'.
-spec checksum_list(port_wrap(), epoch_id(), file_name()) -> -spec checksum_list(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name()) ->
{ok, [chunk_summary()]} | {ok, [machi_dt:chunk_summary()]} |
{error, error_general() | 'no_such_file' | 'partial_read'} | {error, machi_dt:error_general() | 'no_such_file' | 'partial_read'} |
{error, term()}. {error, term()}.
checksum_list(Sock, EpochID, File) -> checksum_list(Sock, EpochID, File) ->
checksum_list2(Sock, EpochID, File). checksum_list2(Sock, EpochID, File).
%% @doc Fetch the list of chunk checksums for `File'. %% @doc Fetch the list of chunk checksums for `File'.
-spec checksum_list(inet_host(), inet_port(), epoch_id(), file_name()) -> -spec checksum_list(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(), machi_dt:file_name()) ->
{ok, [chunk_summary()]} | {ok, [machi_dt:chunk_summary()]} |
{error, error_general() | 'no_such_file'} | {error, term()}. {error, machi_dt:error_general() | 'no_such_file'} | {error, term()}.
checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try try
@ -218,15 +193,15 @@ checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
%% @doc Fetch the list of all files on the remote FLU. %% @doc Fetch the list of all files on the remote FLU.
-spec list_files(port_wrap(), epoch_id()) -> -spec list_files(port_wrap(), machi_dt:epoch_id()) ->
{ok, [file_info()]} | {error, term()}. {ok, [machi_dt:file_info()]} | {error, term()}.
list_files(Sock, EpochID) -> list_files(Sock, EpochID) ->
list2(Sock, EpochID). list2(Sock, EpochID).
%% @doc Fetch the list of all files on the remote FLU. %% @doc Fetch the list of all files on the remote FLU.
-spec list_files(inet_host(), inet_port(), epoch_id()) -> -spec list_files(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id()) ->
{ok, [file_info()]} | {error, term()}. {ok, [machi_dt:file_info()]} | {error, term()}.
list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) -> list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try try
@ -238,15 +213,15 @@ list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) ->
%% @doc Fetch the wedge status from the remote FLU. %% @doc Fetch the wedge status from the remote FLU.
-spec wedge_status(port_wrap()) -> -spec wedge_status(port_wrap()) ->
{ok, {boolean(), pv1_epoch()}} | {error, term()}. {ok, {boolean(), machi_dt:epoch_id()}} | {error, term()}.
wedge_status(Sock) -> wedge_status(Sock) ->
wedge_status2(Sock). wedge_status2(Sock).
%% @doc Fetch the wedge status from the remote FLU. %% @doc Fetch the wedge status from the remote FLU.
-spec wedge_status(inet_host(), inet_port()) -> -spec wedge_status(machi_dt:inet_host(), machi_dt:inet_port()) ->
{ok, {boolean(), pv1_epoch()}} | {error, term()}. {ok, {boolean(), machi_dt:epoch_id()}} | {error, term()}.
wedge_status(Host, TcpPort) when is_integer(TcpPort) -> wedge_status(Host, TcpPort) when is_integer(TcpPort) ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try try
@ -257,16 +232,16 @@ wedge_status(Host, TcpPort) when is_integer(TcpPort) ->
%% @doc Get the latest epoch number + checksum from the FLU's projection store. %% @doc Get the latest epoch number + checksum from the FLU's projection store.
-spec get_latest_epochid(port_wrap(), projection_type()) -> -spec get_latest_epochid(port_wrap(), machi_dt:projection_type()) ->
{ok, epoch_id()} | {error, term()}. {ok, machi_dt:epoch_id()} | {error, term()}.
get_latest_epochid(Sock, ProjType) get_latest_epochid(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' -> when ProjType == 'public' orelse ProjType == 'private' ->
get_latest_epochid2(Sock, ProjType). get_latest_epochid2(Sock, ProjType).
%% @doc Get the latest epoch number + checksum from the FLU's projection store. %% @doc Get the latest epoch number + checksum from the FLU's projection store.
-spec get_latest_epochid(inet_host(), inet_port(), projection_type()) -> -spec get_latest_epochid(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:projection_type()) ->
{ok, epoch_id()} | {error, term()}. {ok, machi_dt:epoch_id()} | {error, term()}.
get_latest_epochid(Host, TcpPort, ProjType) get_latest_epochid(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' -> when ProjType == 'public' orelse ProjType == 'private' ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
@ -278,17 +253,17 @@ get_latest_epochid(Host, TcpPort, ProjType)
%% @doc Get the latest projection from the FLU's projection store for `ProjType' %% @doc Get the latest projection from the FLU's projection store for `ProjType'
-spec read_latest_projection(port_wrap(), projection_type()) -> -spec read_latest_projection(port_wrap(), machi_dt:projection_type()) ->
{ok, projection()} | {error, not_written} | {error, term()}. {ok, machi_dt:projection()} | {error, not_written} | {error, term()}.
read_latest_projection(Sock, ProjType) read_latest_projection(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' -> when ProjType == 'public' orelse ProjType == 'private' ->
read_latest_projection2(Sock, ProjType). read_latest_projection2(Sock, ProjType).
%% @doc Get the latest projection from the FLU's projection store for `ProjType' %% @doc Get the latest projection from the FLU's projection store for `ProjType'
-spec read_latest_projection(inet_host(), inet_port(), -spec read_latest_projection(machi_dt:inet_host(), machi_dt:inet_port(),
projection_type()) -> machi_dt:projection_type()) ->
{ok, projection()} | {error, not_written} | {error, term()}. {ok, machi_dt:projection()} | {error, not_written} | {error, term()}.
read_latest_projection(Host, TcpPort, ProjType) read_latest_projection(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' -> when ProjType == 'public' orelse ProjType == 'private' ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
@ -300,17 +275,17 @@ read_latest_projection(Host, TcpPort, ProjType)
%% @doc Read a projection `Proj' of type `ProjType'. %% @doc Read a projection `Proj' of type `ProjType'.
-spec read_projection(port_wrap(), projection_type(), epoch_num()) -> -spec read_projection(port_wrap(), machi_dt:projection_type(), machi_dt:epoch_num()) ->
{ok, projection()} | {error, not_written} | {error, term()}. {ok, machi_dt:projection()} | {error, not_written} | {error, term()}.
read_projection(Sock, ProjType, Epoch) read_projection(Sock, ProjType, Epoch)
when ProjType == 'public' orelse ProjType == 'private' -> when ProjType == 'public' orelse ProjType == 'private' ->
read_projection2(Sock, ProjType, Epoch). read_projection2(Sock, ProjType, Epoch).
%% @doc Read a projection `Proj' of type `ProjType'. %% @doc Read a projection `Proj' of type `ProjType'.
-spec read_projection(inet_host(), inet_port(), -spec read_projection(machi_dt:inet_host(), machi_dt:inet_port(),
projection_type(), epoch_num()) -> machi_dt:projection_type(), machi_dt:epoch_num()) ->
{ok, projection()} | {error, not_written} | {error, term()}. {ok, machi_dt:projection()} | {error, not_written} | {error, term()}.
read_projection(Host, TcpPort, ProjType, Epoch) read_projection(Host, TcpPort, ProjType, Epoch)
when ProjType == 'public' orelse ProjType == 'private' -> when ProjType == 'public' orelse ProjType == 'private' ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
@ -322,7 +297,7 @@ read_projection(Host, TcpPort, ProjType, Epoch)
%% @doc Write a projection `Proj' of type `ProjType'. %% @doc Write a projection `Proj' of type `ProjType'.
-spec write_projection(port_wrap(), projection_type(), projection()) -> -spec write_projection(port_wrap(), machi_dt:projection_type(), machi_dt:projection()) ->
'ok' | {error, 'written'} | {error, term()}. 'ok' | {error, 'written'} | {error, term()}.
write_projection(Sock, ProjType, Proj) write_projection(Sock, ProjType, Proj)
when ProjType == 'public' orelse ProjType == 'private', when ProjType == 'public' orelse ProjType == 'private',
@ -331,8 +306,8 @@ write_projection(Sock, ProjType, Proj)
%% @doc Write a projection `Proj' of type `ProjType'. %% @doc Write a projection `Proj' of type `ProjType'.
-spec write_projection(inet_host(), inet_port(), -spec write_projection(machi_dt:inet_host(), machi_dt:inet_port(),
projection_type(), projection()) -> machi_dt:projection_type(), machi_dt:projection()) ->
'ok' | {error, 'written'} | {error, term()}. 'ok' | {error, 'written'} | {error, term()}.
write_projection(Host, TcpPort, ProjType, Proj) write_projection(Host, TcpPort, ProjType, Proj)
when ProjType == 'public' orelse ProjType == 'private', when ProjType == 'public' orelse ProjType == 'private',
@ -346,17 +321,16 @@ write_projection(Host, TcpPort, ProjType, Proj)
%% @doc Get all projections from the FLU's projection store. %% @doc Get all projections from the FLU's projection store.
-spec get_all_projections(port_wrap(), projection_type()) -> -spec get_all_projections(port_wrap(), machi_dt:projection_type()) ->
{ok, [projection()]} | {error, term()}. {ok, [machi_dt:projection()]} | {error, term()}.
get_all_projections(Sock, ProjType) get_all_projections(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' -> when ProjType == 'public' orelse ProjType == 'private' ->
get_all_projections2(Sock, ProjType). get_all_projections2(Sock, ProjType).
%% @doc Get all projections from the FLU's projection store. %% @doc Get all projections from the FLU's projection store.
-spec get_all_projections(inet_host(), inet_port(), -spec get_all_projections(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:projection_type()) ->
projection_type()) -> {ok, [machi_dt:projection()]} | {error, term()}.
{ok, [projection()]} | {error, term()}.
get_all_projections(Host, TcpPort, ProjType) get_all_projections(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' -> when ProjType == 'public' orelse ProjType == 'private' ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
@ -368,7 +342,7 @@ get_all_projections(Host, TcpPort, ProjType)
%% @doc Get all epoch numbers from the FLU's projection store. %% @doc Get all epoch numbers from the FLU's projection store.
-spec list_all_projections(port_wrap(), projection_type()) -> -spec list_all_projections(port_wrap(), machi_dt:projection_type()) ->
{ok, [non_neg_integer()]} | {error, term()}. {ok, [non_neg_integer()]} | {error, term()}.
list_all_projections(Sock, ProjType) list_all_projections(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' -> when ProjType == 'public' orelse ProjType == 'private' ->
@ -376,8 +350,7 @@ list_all_projections(Sock, ProjType)
%% @doc Get all epoch numbers from the FLU's projection store. %% @doc Get all epoch numbers from the FLU's projection store.
-spec list_all_projections(inet_host(), inet_port(), -spec list_all_projections(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:projection_type()) ->
projection_type()) ->
{ok, [non_neg_integer()]} | {error, term()}. {ok, [non_neg_integer()]} | {error, term()}.
list_all_projections(Host, TcpPort, ProjType) list_all_projections(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' -> when ProjType == 'public' orelse ProjType == 'private' ->
@ -419,8 +392,8 @@ disconnect(_) ->
%% @doc Restricted API: Write a chunk of already-sequenced data to %% @doc Restricted API: Write a chunk of already-sequenced data to
%% `File' at `Offset'. %% `File' at `Offset'.
-spec write_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk()) -> -spec write_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk()) ->
ok | {error, error_general()} | {error, term()}. ok | {error, machi_dt:error_general()} | {error, term()}.
write_chunk(Sock, EpochID, File, Offset, Chunk) write_chunk(Sock, EpochID, File, Offset, Chunk)
when Offset >= ?MINIMUM_OFFSET -> when Offset >= ?MINIMUM_OFFSET ->
write_chunk2(Sock, EpochID, File, Offset, Chunk). write_chunk2(Sock, EpochID, File, Offset, Chunk).
@ -428,9 +401,9 @@ write_chunk(Sock, EpochID, File, Offset, Chunk)
%% @doc Restricted API: Write a chunk of already-sequenced data to %% @doc Restricted API: Write a chunk of already-sequenced data to
%% `File' at `Offset'. %% `File' at `Offset'.
-spec write_chunk(inet_host(), inet_port(), -spec write_chunk(machi_dt:inet_host(), machi_dt:inet_port(),
epoch_id(), file_name(), file_offset(), chunk()) -> machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk()) ->
ok | {error, error_general()} | {error, term()}. ok | {error, machi_dt:error_general()} | {error, term()}.
write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk) write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk)
when Offset >= ?MINIMUM_OFFSET -> when Offset >= ?MINIMUM_OFFSET ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
@ -443,16 +416,16 @@ write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk)
%% @doc Restricted API: Delete a file after it has been successfully %% @doc Restricted API: Delete a file after it has been successfully
%% migrated. %% migrated.
-spec delete_migration(port_wrap(), epoch_id(), file_name()) -> -spec delete_migration(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name()) ->
ok | {error, error_general() | 'no_such_file'} | {error, term()}. ok | {error, machi_dt:error_general() | 'no_such_file'} | {error, term()}.
delete_migration(Sock, EpochID, File) -> delete_migration(Sock, EpochID, File) ->
delete_migration2(Sock, EpochID, File). delete_migration2(Sock, EpochID, File).
%% @doc Restricted API: Delete a file after it has been successfully %% @doc Restricted API: Delete a file after it has been successfully
%% migrated. %% migrated.
-spec delete_migration(inet_host(), inet_port(), epoch_id(), file_name()) -> -spec delete_migration(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(), machi_dt:file_name()) ->
ok | {error, error_general() | 'no_such_file'} | {error, term()}. ok | {error, machi_dt:error_general() | 'no_such_file'} | {error, term()}.
delete_migration(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> delete_migration(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try try
@ -464,16 +437,16 @@ delete_migration(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
%% @doc Restricted API: Truncate a file after it has been successfully %% @doc Restricted API: Truncate a file after it has been successfully
%% erasure coded. %% erasure coded.
-spec trunc_hack(port_wrap(), epoch_id(), file_name()) -> -spec trunc_hack(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name()) ->
ok | {error, error_general() | 'no_such_file'} | {error, term()}. ok | {error, machi_dt:error_general() | 'no_such_file'} | {error, term()}.
trunc_hack(Sock, EpochID, File) -> trunc_hack(Sock, EpochID, File) ->
trunc_hack2(Sock, EpochID, File). trunc_hack2(Sock, EpochID, File).
%% @doc Restricted API: Truncate a file after it has been successfully %% @doc Restricted API: Truncate a file after it has been successfully
%% erasure coded. %% erasure coded.
-spec trunc_hack(inet_host(), inet_port(), epoch_id(), file_name()) -> -spec trunc_hack(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(), machi_dt:file_name()) ->
ok | {error, error_general() | 'no_such_file'} | {error, term()}. ok | {error, machi_dt:error_general() | 'no_such_file'} | {error, term()}.
trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try try

View file

@ -48,7 +48,7 @@
%% @doc Create a registered name atom for FLU sequencer internal %% @doc Create a registered name atom for FLU sequencer internal
%% rendezvous/message passing use. %% rendezvous/message passing use.
-spec make_regname(binary()|list()) -> -spec make_regname(binary()|string()) ->
atom(). atom().
make_regname(Prefix) when is_binary(Prefix) -> make_regname(Prefix) when is_binary(Prefix) ->
erlang:binary_to_atom(Prefix, latin1); erlang:binary_to_atom(Prefix, latin1);
@ -231,13 +231,13 @@ make_tagged_csum(server_regen, SHA) ->
%% @doc Log a verbose message. %% @doc Log a verbose message.
-spec verb(string()) -> term(). -spec verb(string()) -> ok.
verb(Fmt) -> verb(Fmt) ->
verb(Fmt, []). verb(Fmt, []).
%% @doc Log a verbose message. %% @doc Log a verbose message.
-spec verb(string(), list()) -> term(). -spec verb(string(), list()) -> ok.
verb(Fmt, Args) -> verb(Fmt, Args) ->
case application:get_env(kernel, verbose) of case application:get_env(kernel, verbose) of
{ok, true} -> io:format(Fmt, Args); {ok, true} -> io:format(Fmt, Args);

View file

@ -64,45 +64,15 @@
chunk_size chunk_size
}). }).
-type chunk() :: chunk_bin() | {chunk_csum(), chunk_bin()}.
-type chunk_bin() :: binary() | iolist(). % client can use either
-type chunk_csum() :: binary(). % 1 byte tag, N-1 bytes checksum
%% -type chunk_summary() :: {file_offset(), chunk_size(), binary()}.
-type chunk_s() :: binary(). % server always uses binary()
-type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}.
-type chunk_size() :: non_neg_integer().
-type error_general() :: 'bad_arg' | 'wedged'.
-type epoch_csum() :: binary().
-type epoch_num() :: -1 | non_neg_integer().
-type epoch_id() :: {epoch_num(), epoch_csum()}.
-type file_info() :: {file_size(), file_name_s()}.
-type file_name() :: binary() | list().
-type file_name_s() :: binary(). % server reply
-type file_offset() :: non_neg_integer().
-type file_size() :: non_neg_integer().
-type file_prefix() :: binary() | list().
-type inet_host() :: inet:ip_address() | inet:hostname().
-type inet_port() :: inet:port_number().
-type port_wrap() :: #yessir{}. % yessir non-standard!
-type projection() :: #projection_v1{}.
-type projection_type() :: 'public' | 'private'.
-export_type([epoch_id/0]).
%% @doc Append a chunk (binary- or iolist-style) of data to a file %% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'. %% with `Prefix'.
-spec append_chunk(port_wrap(), epoch_id(), file_prefix(), chunk()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
append_chunk(Sock, EpochID, Prefix, Chunk) -> append_chunk(Sock, EpochID, Prefix, Chunk) ->
append_chunk_extra(Sock, EpochID, Prefix, Chunk, 0). append_chunk_extra(Sock, EpochID, Prefix, Chunk, 0).
%% @doc Append a chunk (binary- or iolist-style) of data to a file %% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'. %% with `Prefix'.
-spec append_chunk(inet_host(), inet_port(),
epoch_id(), file_prefix(), chunk()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
append_chunk(_Host, _TcpPort, EpochID, Prefix, Chunk) -> append_chunk(_Host, _TcpPort, EpochID, Prefix, Chunk) ->
Sock = connect(#p_srvr{proto_mod=?MODULE}), Sock = connect(#p_srvr{proto_mod=?MODULE}),
try try
@ -119,8 +89,6 @@ append_chunk(_Host, _TcpPort, EpochID, Prefix, Chunk) ->
%% be reserved by the file sequencer for later write(s) by the %% be reserved by the file sequencer for later write(s) by the
%% `write_chunk()' API. %% `write_chunk()' API.
-spec append_chunk_extra(port_wrap(), epoch_id(), file_prefix(), chunk(), chunk_size()) ->
{ok, chunk_pos()}. %%%% | {error, error_general()} | {error, term()}.
append_chunk_extra(#yessir{name=Name,start_bin=StartBin}, append_chunk_extra(#yessir{name=Name,start_bin=StartBin},
_EpochID, Prefix, Chunk, ChunkExtra) _EpochID, Prefix, Chunk, ChunkExtra)
when is_integer(ChunkExtra), ChunkExtra >= 0 -> when is_integer(ChunkExtra), ChunkExtra >= 0 ->
@ -140,9 +108,6 @@ append_chunk_extra(#yessir{name=Name,start_bin=StartBin},
%% be reserved by the file sequencer for later write(s) by the %% be reserved by the file sequencer for later write(s) by the
%% `write_chunk()' API. %% `write_chunk()' API.
-spec append_chunk_extra(inet_host(), inet_port(),
epoch_id(), file_prefix(), chunk(), chunk_size()) ->
{ok, chunk_pos()}. %%%% | {error, error_general()} | {error, term()}.
append_chunk_extra(_Host, _TcpPort, EpochID, Prefix, Chunk, ChunkExtra) append_chunk_extra(_Host, _TcpPort, EpochID, Prefix, Chunk, ChunkExtra)
when is_integer(ChunkExtra), ChunkExtra >= 0 -> when is_integer(ChunkExtra), ChunkExtra >= 0 ->
Sock = connect(#p_srvr{proto_mod=?MODULE}), Sock = connect(#p_srvr{proto_mod=?MODULE}),
@ -154,10 +119,6 @@ append_chunk_extra(_Host, _TcpPort, EpochID, Prefix, Chunk, ChunkExtra)
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'. %% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
-spec read_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk_size()) ->
{ok, chunk_s()} |
{error, error_general() | 'no_such_file' | 'partial_read'} |
{error, term()}.
read_chunk(#yessir{name=Name}, _EpochID, File, Offset, Size) read_chunk(#yessir{name=Name}, _EpochID, File, Offset, Size)
when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
case get({Name,offset,File}) of case get({Name,offset,File}) of
@ -201,11 +162,6 @@ make_csum(Name,Size) ->
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'. %% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
-spec read_chunk(inet_host(), inet_port(), epoch_id(),
file_name(), file_offset(), chunk_size()) ->
{ok, chunk_s()} |
{error, error_general() | 'no_such_file' | 'partial_read'} |
{error, term()}.
read_chunk(_Host, _TcpPort, EpochID, File, Offset, Size) read_chunk(_Host, _TcpPort, EpochID, File, Offset, Size)
when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
Sock = connect(#p_srvr{proto_mod=?MODULE}), Sock = connect(#p_srvr{proto_mod=?MODULE}),
@ -217,10 +173,6 @@ read_chunk(_Host, _TcpPort, EpochID, File, Offset, Size)
%% @doc Fetch the list of chunk checksums for `File'. %% @doc Fetch the list of chunk checksums for `File'.
-spec checksum_list(port_wrap(), epoch_id(), file_name()) ->
{ok, [chunk_csum()]} |
{error, error_general() | 'no_such_file' | 'partial_read'} |
{error, term()}.
checksum_list(#yessir{name=Name,chunk_size=ChunkSize}, _EpochID, File) -> checksum_list(#yessir{name=Name,chunk_size=ChunkSize}, _EpochID, File) ->
case get({Name,offset,File}) of case get({Name,offset,File}) of
undefined -> undefined ->
@ -234,9 +186,6 @@ checksum_list(#yessir{name=Name,chunk_size=ChunkSize}, _EpochID, File) ->
%% @doc Fetch the list of chunk checksums for `File'. %% @doc Fetch the list of chunk checksums for `File'.
-spec checksum_list(inet_host(), inet_port(), epoch_id(), file_name()) ->
{ok, [chunk_csum()]} |
{error, error_general() | 'no_such_file'} | {error, term()}.
checksum_list(_Host, _TcpPort, EpochID, File) -> checksum_list(_Host, _TcpPort, EpochID, File) ->
Sock = connect(#p_srvr{proto_mod=?MODULE}), Sock = connect(#p_srvr{proto_mod=?MODULE}),
try try
@ -247,16 +196,12 @@ checksum_list(_Host, _TcpPort, EpochID, File) ->
%% @doc Fetch the list of all files on the remote FLU. %% @doc Fetch the list of all files on the remote FLU.
-spec list_files(port_wrap(), epoch_id()) ->
{ok, [file_info()]} | {error, term()}.
list_files(#yessir{name=Name}, _EpochID) -> list_files(#yessir{name=Name}, _EpochID) ->
Files = [{Offset, File} || {{N,offset,File}, Offset} <- get(), N == Name], Files = [{Offset, File} || {{N,offset,File}, Offset} <- get(), N == Name],
{ok, Files}. {ok, Files}.
%% @doc Fetch the list of all files on the remote FLU. %% @doc Fetch the list of all files on the remote FLU.
-spec list_files(inet_host(), inet_port(), epoch_id()) ->
{ok, [file_info()]} | {error, term()}.
list_files(_Host, _TcpPort, EpochID) -> list_files(_Host, _TcpPort, EpochID) ->
Sock = connect(#p_srvr{proto_mod=?MODULE}), Sock = connect(#p_srvr{proto_mod=?MODULE}),
try try
@ -267,16 +212,11 @@ list_files(_Host, _TcpPort, EpochID) ->
%% @doc Fetch the wedge status from the remote FLU. %% @doc Fetch the wedge status from the remote FLU.
-spec wedge_status(port_wrap()) ->
{ok, {boolean(), pv1_epoch()}} | {error, term()}.
wedge_status(_Sock) -> wedge_status(_Sock) ->
{ok, {false, ?DUMMY_PV1_EPOCH}}. {ok, {false, ?DUMMY_PV1_EPOCH}}.
%% @doc Fetch the wedge status from the remote FLU. %% @doc Fetch the wedge status from the remote FLU.
-spec wedge_status(inet_host(), inet_port()) ->
{ok, {boolean(), pv1_epoch()}} | {error, term()}.
wedge_status(_Host, _TcpPort) -> wedge_status(_Host, _TcpPort) ->
Sock = connect(#p_srvr{proto_mod=?MODULE}), Sock = connect(#p_srvr{proto_mod=?MODULE}),
try try
@ -287,8 +227,6 @@ wedge_status(_Host, _TcpPort) ->
%% @doc Get the latest epoch number + checksum from the FLU's projection store. %% @doc Get the latest epoch number + checksum from the FLU's projection store.
-spec get_latest_epoch(port_wrap(), projection_type()) ->
{ok, epoch_id()} | {error, term()}.
get_latest_epoch(Sock, ProjType) get_latest_epoch(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' -> when ProjType == 'public' orelse ProjType == 'private' ->
case read_latest_projection(Sock, ProjType) of case read_latest_projection(Sock, ProjType) of
@ -298,9 +236,6 @@ get_latest_epoch(Sock, ProjType)
%% @doc Get the latest epoch number + checksum from the FLU's projection store. %% @doc Get the latest epoch number + checksum from the FLU's projection store.
-spec get_latest_epoch(inet_host(), inet_port(),
projection_type()) ->
{ok, epoch_id()} | {error, term()}.
get_latest_epoch(_Host, _TcpPort, ProjType) get_latest_epoch(_Host, _TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' -> when ProjType == 'public' orelse ProjType == 'private' ->
Sock = connect(#p_srvr{proto_mod=?MODULE}), Sock = connect(#p_srvr{proto_mod=?MODULE}),
@ -312,8 +247,6 @@ get_latest_epoch(_Host, _TcpPort, ProjType)
%% @doc Get the latest projection from the FLU's projection store for `ProjType' %% @doc Get the latest projection from the FLU's projection store for `ProjType'
-spec read_latest_projection(port_wrap(), projection_type()) ->
{ok, projection()} | {error, not_written} | {error, term()}.
read_latest_projection(#yessir{name=Name}, ProjType) read_latest_projection(#yessir{name=Name}, ProjType)
when ProjType == 'public' orelse ProjType == 'private' -> when ProjType == 'public' orelse ProjType == 'private' ->
Ps = [P || {{N,proj,PT,_Epoch}, P} <- get(), N == Name, PT == ProjType], Ps = [P || {{N,proj,PT,_Epoch}, P} <- get(), N == Name, PT == ProjType],
@ -330,9 +263,6 @@ read_latest_projection(#yessir{name=Name}, ProjType)
%% @doc Get the latest projection from the FLU's projection store for `ProjType' %% @doc Get the latest projection from the FLU's projection store for `ProjType'
-spec read_latest_projection(inet_host(), inet_port(),
projection_type()) ->
{ok, projection()} | {error, not_written} | {error, term()}.
read_latest_projection(_Host, _TcpPort, ProjType) read_latest_projection(_Host, _TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' -> when ProjType == 'public' orelse ProjType == 'private' ->
Sock = connect(#p_srvr{proto_mod=?MODULE}), Sock = connect(#p_srvr{proto_mod=?MODULE}),
@ -344,8 +274,6 @@ read_latest_projection(_Host, _TcpPort, ProjType)
%% @doc Read a projection `Proj' of type `ProjType'. %% @doc Read a projection `Proj' of type `ProjType'.
-spec read_projection(port_wrap(), projection_type(), epoch_num()) ->
{ok, projection()} | {error, not_written} | {error, term()}.
read_projection(#yessir{name=Name}, ProjType, Epoch) read_projection(#yessir{name=Name}, ProjType, Epoch)
when ProjType == 'public' orelse ProjType == 'private' -> when ProjType == 'public' orelse ProjType == 'private' ->
case get({Name,proj,ProjType,Epoch}) of case get({Name,proj,ProjType,Epoch}) of
@ -357,9 +285,6 @@ read_projection(#yessir{name=Name}, ProjType, Epoch)
%% @doc Read a projection `Proj' of type `ProjType'. %% @doc Read a projection `Proj' of type `ProjType'.
-spec read_projection(inet_host(), inet_port(),
projection_type(), epoch_num()) ->
{ok, projection()} | {error, written} | {error, term()}.
read_projection(_Host, _TcpPort, ProjType, Epoch) read_projection(_Host, _TcpPort, ProjType, Epoch)
when ProjType == 'public' orelse ProjType == 'private' -> when ProjType == 'public' orelse ProjType == 'private' ->
Sock = connect(#p_srvr{proto_mod=?MODULE}), Sock = connect(#p_srvr{proto_mod=?MODULE}),
@ -371,8 +296,6 @@ read_projection(_Host, _TcpPort, ProjType, Epoch)
%% @doc Write a projection `Proj' of type `ProjType'. %% @doc Write a projection `Proj' of type `ProjType'.
-spec write_projection(port_wrap(), projection_type(), projection()) ->
'ok' | {error, written} | {error, term()}.
write_projection(#yessir{name=Name}=Sock, ProjType, Proj) write_projection(#yessir{name=Name}=Sock, ProjType, Proj)
when ProjType == 'public' orelse ProjType == 'private', when ProjType == 'public' orelse ProjType == 'private',
is_record(Proj, projection_v1) -> is_record(Proj, projection_v1) ->
@ -387,9 +310,6 @@ write_projection(#yessir{name=Name}=Sock, ProjType, Proj)
%% @doc Write a projection `Proj' of type `ProjType'. %% @doc Write a projection `Proj' of type `ProjType'.
-spec write_projection(inet_host(), inet_port(),
projection_type(), projection()) ->
'ok' | {error, written} | {error, term()}.
write_projection(_Host, _TcpPort, ProjType, Proj) write_projection(_Host, _TcpPort, ProjType, Proj)
when ProjType == 'public' orelse ProjType == 'private', when ProjType == 'public' orelse ProjType == 'private',
is_record(Proj, projection_v1) -> is_record(Proj, projection_v1) ->
@ -402,8 +322,6 @@ write_projection(_Host, _TcpPort, ProjType, Proj)
%% @doc Get all projections from the FLU's projection store. %% @doc Get all projections from the FLU's projection store.
-spec get_all_projections(port_wrap(), projection_type()) ->
{ok, [projection()]} | {error, term()}.
get_all_projections(#yessir{name=Name}, ProjType) get_all_projections(#yessir{name=Name}, ProjType)
when ProjType == 'public' orelse ProjType == 'private' -> when ProjType == 'public' orelse ProjType == 'private' ->
Ps = [Proj || {{N,proj,PT,_}, Proj} <- get(), N == Name, PT == ProjType], Ps = [Proj || {{N,proj,PT,_}, Proj} <- get(), N == Name, PT == ProjType],
@ -411,9 +329,6 @@ get_all_projections(#yessir{name=Name}, ProjType)
%% @doc Get all projections from the FLU's projection store. %% @doc Get all projections from the FLU's projection store.
-spec get_all_projections(inet_host(), inet_port(),
projection_type()) ->
{ok, [projection()]} | {error, term()}.
get_all_projections(_Host, _TcpPort, ProjType) get_all_projections(_Host, _TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' -> when ProjType == 'public' orelse ProjType == 'private' ->
Sock = connect(#p_srvr{proto_mod=?MODULE}), Sock = connect(#p_srvr{proto_mod=?MODULE}),
@ -425,8 +340,6 @@ get_all_projections(_Host, _TcpPort, ProjType)
%% @doc Get all epoch numbers from the FLU's projection store. %% @doc Get all epoch numbers from the FLU's projection store.
-spec list_all_projections(port_wrap(), projection_type()) ->
{ok, [non_neg_integer()]} | {error, term()}.
list_all_projections(Sock, ProjType) list_all_projections(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' -> when ProjType == 'public' orelse ProjType == 'private' ->
case get_all_projections(Sock, ProjType) of case get_all_projections(Sock, ProjType) of
@ -436,9 +349,6 @@ list_all_projections(Sock, ProjType)
%% @doc Get all epoch numbers from the FLU's projection store. %% @doc Get all epoch numbers from the FLU's projection store.
-spec list_all_projections(inet_host(), inet_port(),
projection_type()) ->
{ok, [non_neg_integer()]} | {error, term()}.
list_all_projections(_Host, _TcpPort, ProjType) list_all_projections(_Host, _TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' -> when ProjType == 'public' orelse ProjType == 'private' ->
Sock = connect(#p_srvr{proto_mod=?MODULE}), Sock = connect(#p_srvr{proto_mod=?MODULE}),
@ -450,8 +360,6 @@ list_all_projections(_Host, _TcpPort, ProjType)
%% @doc Quit &amp; close the connection to remote FLU. %% @doc Quit &amp; close the connection to remote FLU.
-spec quit(port_wrap()) ->
ok.
quit(_) -> quit(_) ->
ok. ok.
@ -460,8 +368,6 @@ quit(_) ->
%% @doc Restricted API: Write a chunk of already-sequenced data to %% @doc Restricted API: Write a chunk of already-sequenced data to
%% `File' at `Offset'. %% `File' at `Offset'.
-spec write_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk()) ->
ok | {error, error_general()} | {error, term()}.
write_chunk(#yessir{name=Name}, _EpochID, File, Offset, Chunk) write_chunk(#yessir{name=Name}, _EpochID, File, Offset, Chunk)
when Offset >= ?MINIMUM_OFFSET -> when Offset >= ?MINIMUM_OFFSET ->
Pos = case get({Name,offset,File}) of Pos = case get({Name,offset,File}) of
@ -474,9 +380,6 @@ write_chunk(#yessir{name=Name}, _EpochID, File, Offset, Chunk)
%% @doc Restricted API: Write a chunk of already-sequenced data to %% @doc Restricted API: Write a chunk of already-sequenced data to
%% `File' at `Offset'. %% `File' at `Offset'.
-spec write_chunk(inet_host(), inet_port(),
epoch_id(), file_name(), file_offset(), chunk()) ->
ok | {error, error_general()} | {error, term()}.
write_chunk(_Host, _TcpPort, EpochID, File, Offset, Chunk) write_chunk(_Host, _TcpPort, EpochID, File, Offset, Chunk)
when Offset >= ?MINIMUM_OFFSET -> when Offset >= ?MINIMUM_OFFSET ->
Sock = connect(#p_srvr{proto_mod=?MODULE}), Sock = connect(#p_srvr{proto_mod=?MODULE}),
@ -489,8 +392,6 @@ write_chunk(_Host, _TcpPort, EpochID, File, Offset, Chunk)
%% @doc Restricted API: Delete a file after it has been successfully %% @doc Restricted API: Delete a file after it has been successfully
%% migrated. %% migrated.
-spec delete_migration(port_wrap(), epoch_id(), file_name()) ->
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
delete_migration(#yessir{name=Name}, _EpochID, File) -> delete_migration(#yessir{name=Name}, _EpochID, File) ->
case get({Name,offset,File}) of case get({Name,offset,File}) of
undefined -> undefined ->
@ -503,8 +404,6 @@ delete_migration(#yessir{name=Name}, _EpochID, File) ->
%% @doc Restricted API: Delete a file after it has been successfully %% @doc Restricted API: Delete a file after it has been successfully
%% migrated. %% migrated.
-spec delete_migration(inet_host(), inet_port(), epoch_id(), file_name()) ->
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
delete_migration(_Host, _TcpPort, EpochID, File) -> delete_migration(_Host, _TcpPort, EpochID, File) ->
Sock = connect(#p_srvr{proto_mod=?MODULE}), Sock = connect(#p_srvr{proto_mod=?MODULE}),
try try
@ -516,16 +415,12 @@ delete_migration(_Host, _TcpPort, EpochID, File) ->
%% @doc Restricted API: Truncate a file after it has been successfully %% @doc Restricted API: Truncate a file after it has been successfully
%% erasure coded. %% erasure coded.
-spec trunc_hack(port_wrap(), epoch_id(), file_name()) ->
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
trunc_hack(#yessir{name=Name}, _EpochID, File) -> trunc_hack(#yessir{name=Name}, _EpochID, File) ->
put({Name,offset,File}, ?MINIMUM_OFFSET). put({Name,offset,File}, ?MINIMUM_OFFSET).
%% @doc Restricted API: Truncate a file after it has been successfully %% @doc Restricted API: Truncate a file after it has been successfully
%% erasure coded. %% erasure coded.
-spec trunc_hack(inet_host(), inet_port(), epoch_id(), file_name()) ->
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
trunc_hack(_Host, _TcpPort, EpochID, File) -> trunc_hack(_Host, _TcpPort, EpochID, File) ->
Sock = connect(#p_srvr{proto_mod=?MODULE}), Sock = connect(#p_srvr{proto_mod=?MODULE}),
try try

View file

@ -135,6 +135,29 @@ smoke_test2() ->
%% Exactly one file right now %% Exactly one file right now
{ok, [_]} = machi_cr_client:list_files(C1), {ok, [_]} = machi_cr_client:list_files(C1),
%% Go back and test append_chunk_extra() and write_chunk()
Chunk10 = <<"It's a different chunk!">>,
Size10 = byte_size(Chunk10),
Extra10 = 5,
{ok, {Off10,Size10,File10}} =
machi_cr_client:append_chunk_extra(C1, Prefix, Chunk10,
Extra10 * Size10),
{ok, Chunk10} = machi_cr_client:read_chunk(C1, File10, Off10, Size10),
[begin
Offx = Off10 + (Seq * Size10),
%% TODO: uncomment written/not_written enforcement is available.
%% {error,not_written} = machi_cr_client:read_chunk(C1, File10,
%% Offx, Size10),
{ok, {Offx,Size10,File10}} =
machi_cr_client:write_chunk(C1, File10, Offx, Chunk10),
{ok, Chunk10} = machi_cr_client:read_chunk(C1, File10, Offx,
Size10)
end || Seq <- lists:seq(1, Extra10)],
{ok, {Off11,Size11,File11}} =
machi_cr_client:append_chunk(C1, Prefix, Chunk10),
%% Double-check that our reserved extra bytes were really honored!
true = (Off11 > (Off10 + (Extra10 * Size10))),
ok ok
after after
error_logger:tty(true), error_logger:tty(true),

View file

@ -0,0 +1,79 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_pb_cr_client_test).
-compile(export_all).
-ifdef(TEST).
-ifndef(PULSE).
-include("machi_pb.hrl").
-include("machi_projection.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(C, machi_pb_cr_client).
smoke_test_() ->
{timeout, 5*60, fun() -> smoke_test2() end}.
smoke_test2() ->
Port = 5720,
Ps = [{a,#p_srvr{name=a, address="localhost", port=Port, props="./data.a"}}
],
[os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps],
{ok, SupPid} = machi_flu_sup:start_link(),
try
[begin
#p_srvr{name=Name, port=Port, props=Dir} = P,
{ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, [])
end || {_,P} <- Ps],
[machi_chain_manager1:test_react_to_env(a_chmgr) || _ <-lists:seq(1,5)],
{_, P_a} = hd(Ps),
{ok, Sock} = gen_tcp:connect(P_a#p_srvr.address, P_a#p_srvr.port,
[{packet, line}, binary, {active, false}]),
try
Prefix = <<"pre">>,
Chunk1 = <<"yochunk">>,
ok = gen_tcp:send(Sock, <<"PROTOCOL-BUFFERS\n">>),
{ok, <<"OK\n">>} = gen_tcp:recv(Sock, 0),
ok = inet:setopts(Sock, [{packet,4}]),
R1a = #mpb_request{req_id= <<0>>,
echo=#mpb_echoreq{message = <<"Hello, world!">>}},
Bin1a = machi_pb:encode_mpb_request(R1a),
ok = gen_tcp:send(Sock, Bin1a),
{ok, Bin1B} = gen_tcp:recv(Sock, 0),
R1b = machi_pb:decode_mpb_response(Bin1B),
true = is_record(R1b, mpb_response),
ok
after
(catch gen_tcp:close(Sock))
end
after
exit(SupPid, normal),
[os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps],
machi_util:wait_for_death(SupPid, 100),
ok
end.
-endif. % !PULSE
-endif. % TEST

67
test/machi_pb_test.erl Normal file
View file

@ -0,0 +1,67 @@
%% -------------------------------------------------------------------
%%
%% Machi: a small village of replicated files
%%
%% Copyright (c) 2014-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_pb_test).
-include("machi_pb.hrl").
-compile(export_all).
-ifdef(TEST).
-ifndef(PULSE).
-include_lib("eunit/include/eunit.hrl").
%% We don't expect any problems with these functions: nearly all
%% code executed is compiled from the PB definition file(s) and so
%% ought to be bug-free. Errors are likely to be caused by
%% changes in the spec but without corresponding changes to
%% message types/names here.
smoke_requests_test() ->
Echo0 = #mpb_request{req_id= <<"x">>,
echo=#mpb_echoreq{}},
Echo0 = encdec_request(Echo0),
Echo1 = #mpb_request{req_id= <<"x">>,
echo=#mpb_echoreq{message="Yo!"}},
Echo1 = encdec_request(Echo1),
ok.
smoke_responses_test() ->
R1 = #mpb_response{req_id= <<"x">>,
generic=#mpb_errorresp{code=7,
msg="foo",
extra= <<"bar">>}},
R1 = encdec_response(R1),
ok.
encdec_request(M) ->
machi_pb:decode_mpb_request(
list_to_binary(machi_pb:encode_mpb_request(M))).
encdec_response(M) ->
machi_pb:decode_mpb_response(
list_to_binary(machi_pb:encode_mpb_response(M))).
-endif. % !PULSE
-endif. % TEST