diff --git a/src/machi_admin_util.erl b/src/machi_admin_util.erl index 182d83c..f5702c2 100644 --- a/src/machi_admin_util.erl +++ b/src/machi_admin_util.erl @@ -31,16 +31,15 @@ -include("machi.hrl"). -include("machi_projection.hrl"). --define(T, machi_dt). -define(FLU_C, machi_flu1_client). --spec verify_file_checksums_local(port(), ?T:epoch_id(), binary()|list()) -> +-spec verify_file_checksums_local(port(), machi_dt:epoch_id(), binary()|list()) -> {ok, [tuple()]} | {error, term()}. verify_file_checksums_local(Sock1, EpochID, Path) when is_port(Sock1) -> verify_file_checksums_local2(Sock1, EpochID, Path). --spec verify_file_checksums_local(?T:inet_host(), ?T:inet_port(), - ?T:epoch_id(), binary()|list()) -> +-spec verify_file_checksums_local(machi_dt:inet_host(), machi_dt:inet_port(), + machi_dt:epoch_id(), binary()|list()) -> {ok, [tuple()]} | {error, term()}. verify_file_checksums_local(Host, TcpPort, EpochID, Path) -> Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}), @@ -50,13 +49,13 @@ verify_file_checksums_local(Host, TcpPort, EpochID, Path) -> catch ?FLU_C:disconnect(Sock1) end. --spec verify_file_checksums_remote(port(), ?T:epoch_id(), binary()|list()) -> +-spec verify_file_checksums_remote(port(), machi_dt:epoch_id(), binary()|list()) -> {ok, [tuple()]} | {error, term()}. verify_file_checksums_remote(Sock1, EpochID, File) when is_port(Sock1) -> verify_file_checksums_remote2(Sock1, EpochID, File). --spec verify_file_checksums_remote(?T:inet_host(), ?T:inet_port(), - ?T:epoch_id(), binary()|list()) -> +-spec verify_file_checksums_remote(machi_dt:inet_host(), machi_dt:inet_port(), + machi_dt:epoch_id(), binary()|list()) -> {ok, [tuple()]} | {error, term()}. verify_file_checksums_remote(Host, TcpPort, EpochID, File) -> Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}), diff --git a/src/machi_basho_bench_driver.erl b/src/machi_basho_bench_driver.erl index e1bfcb5..6fd4b59 100644 --- a/src/machi_basho_bench_driver.erl +++ b/src/machi_basho_bench_driver.erl @@ -26,7 +26,7 @@ %% use basho_bench to measure its performance under a certain %% workload. Machi is a bit different than most KV stores in that the %% client has no direct control over the keys -- Machi servers always -%% assign the keys. The schemes typically used by basho_bench & YCSB +%% assign the keys. The schemes typically used by basho_bench & YCSB %% to use/mimic key naming conventions used internally ... are %% difficult to adapt to Machi. %% @@ -50,7 +50,7 @@ %% %% TODO: As an alternate idea, if we know that the chunks written are %% always the same size, and if we don't care about CRC checking, then -%% all we need to know are the file names & file sizes on the server: +%% all we need to know are the file names & file sizes on the server: %% we can then pick any valid offset within that file. That would %% certainly be more scalable than the zillion-row-ETS-table, which is %% definitely RAM-hungry. diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 403c9d6..39c7ab4 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -83,8 +83,6 @@ update_wedge_state/3]). -export([make_listener_regname/1, make_projection_server_regname/1]). --define(T, machi_dt). - -record(state, { flu_name :: atom(), proj_store :: pid(), @@ -93,7 +91,7 @@ data_dir :: string(), wedged = true :: boolean(), etstab :: ets:tid(), - epoch_id :: 'undefined' | ?T:epoch_id(), + epoch_id :: 'undefined' | machi_dt:epoch_id(), dbg_props = [] :: list(), % proplist props = [] :: list() % proplist }). diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index a5a1c2d..c19a36a 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -85,24 +85,22 @@ trunc_hack/3, trunc_hack/4 ]). --define(T, machi_dt). - -type port_wrap() :: {w,atom(),term()}. %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix'. --spec append_chunk(port_wrap(), ?T:epoch_id(), ?T:file_prefix(), ?T:chunk()) -> - {ok, ?T:chunk_pos()} | {error, ?T:error_general()} | {error, term()}. +-spec append_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_prefix(), machi_dt:chunk()) -> + {ok, machi_dt:chunk_pos()} | {error, machi_dt:error_general()} | {error, term()}. append_chunk(Sock, EpochID, Prefix, Chunk) -> append_chunk2(Sock, EpochID, Prefix, Chunk, 0). %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix'. --spec append_chunk(?T:inet_host(), ?T:inet_port(), - ?T:epoch_id(), ?T:file_prefix(), ?T:chunk()) -> - {ok, ?T:chunk_pos()} | {error, ?T:error_general()} | {error, term()}. +-spec append_chunk(machi_dt:inet_host(), machi_dt:inet_port(), + machi_dt:epoch_id(), machi_dt:file_prefix(), machi_dt:chunk()) -> + {ok, machi_dt:chunk_pos()} | {error, machi_dt:error_general()} | {error, term()}. append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try @@ -119,8 +117,8 @@ append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) -> %% be reserved by the file sequencer for later write(s) by the %% `write_chunk()' API. --spec append_chunk_extra(port_wrap(), ?T:epoch_id(), ?T:file_prefix(), ?T:chunk(), ?T:chunk_size()) -> - {ok, ?T:chunk_pos()} | {error, ?T:error_general()} | {error, term()}. +-spec append_chunk_extra(port_wrap(), machi_dt:epoch_id(), machi_dt:file_prefix(), machi_dt:chunk(), machi_dt:chunk_size()) -> + {ok, machi_dt:chunk_pos()} | {error, machi_dt:error_general()} | {error, term()}. append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra) when is_integer(ChunkExtra), ChunkExtra >= 0 -> append_chunk2(Sock, EpochID, Prefix, Chunk, ChunkExtra). @@ -133,9 +131,9 @@ append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra) %% be reserved by the file sequencer for later write(s) by the %% `write_chunk()' API. --spec append_chunk_extra(?T:inet_host(), ?T:inet_port(), - ?T:epoch_id(), ?T:file_prefix(), ?T:chunk(), ?T:chunk_size()) -> - {ok, ?T:chunk_pos()} | {error, ?T:error_general()} | {error, term()}. +-spec append_chunk_extra(machi_dt:inet_host(), machi_dt:inet_port(), + machi_dt:epoch_id(), machi_dt:file_prefix(), machi_dt:chunk(), machi_dt:chunk_size()) -> + {ok, machi_dt:chunk_pos()} | {error, machi_dt:error_general()} | {error, term()}. append_chunk_extra(Host, TcpPort, EpochID, Prefix, Chunk, ChunkExtra) when is_integer(ChunkExtra), ChunkExtra >= 0 -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), @@ -147,9 +145,9 @@ append_chunk_extra(Host, TcpPort, EpochID, Prefix, Chunk, ChunkExtra) %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. --spec read_chunk(port_wrap(), ?T:epoch_id(), ?T:file_name(), ?T:file_offset(), ?T:chunk_size()) -> - {ok, ?T:chunk_s()} | - {error, ?T:error_general() | 'not_written' | 'partial_read'} | +-spec read_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size()) -> + {ok, machi_dt:chunk_s()} | + {error, machi_dt:error_general() | 'not_written' | 'partial_read'} | {error, term()}. read_chunk(Sock, EpochID, File, Offset, Size) when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> @@ -157,10 +155,10 @@ read_chunk(Sock, EpochID, File, Offset, Size) %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. --spec read_chunk(?T:inet_host(), ?T:inet_port(), ?T:epoch_id(), - ?T:file_name(), ?T:file_offset(), ?T:chunk_size()) -> - {ok, ?T:chunk_s()} | - {error, ?T:error_general() | 'not_written' | 'partial_read'} | +-spec read_chunk(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(), + machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size()) -> + {ok, machi_dt:chunk_s()} | + {error, machi_dt:error_general() | 'not_written' | 'partial_read'} | {error, term()}. read_chunk(Host, TcpPort, EpochID, File, Offset, Size) when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> @@ -173,18 +171,18 @@ read_chunk(Host, TcpPort, EpochID, File, Offset, Size) %% @doc Fetch the list of chunk checksums for `File'. --spec checksum_list(port_wrap(), ?T:epoch_id(), ?T:file_name()) -> - {ok, [?T:chunk_summary()]} | - {error, ?T:error_general() | 'no_such_file' | 'partial_read'} | +-spec checksum_list(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name()) -> + {ok, [machi_dt:chunk_summary()]} | + {error, machi_dt:error_general() | 'no_such_file' | 'partial_read'} | {error, term()}. checksum_list(Sock, EpochID, File) -> checksum_list2(Sock, EpochID, File). %% @doc Fetch the list of chunk checksums for `File'. --spec checksum_list(?T:inet_host(), ?T:inet_port(), ?T:epoch_id(), ?T:file_name()) -> - {ok, [?T:chunk_summary()]} | - {error, ?T:error_general() | 'no_such_file'} | {error, term()}. +-spec checksum_list(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(), machi_dt:file_name()) -> + {ok, [machi_dt:chunk_summary()]} | + {error, machi_dt:error_general() | 'no_such_file'} | {error, term()}. checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try @@ -195,15 +193,15 @@ checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> %% @doc Fetch the list of all files on the remote FLU. --spec list_files(port_wrap(), ?T:epoch_id()) -> - {ok, [?T:file_info()]} | {error, term()}. +-spec list_files(port_wrap(), machi_dt:epoch_id()) -> + {ok, [machi_dt:file_info()]} | {error, term()}. list_files(Sock, EpochID) -> list2(Sock, EpochID). %% @doc Fetch the list of all files on the remote FLU. --spec list_files(?T:inet_host(), ?T:inet_port(), ?T:epoch_id()) -> - {ok, [?T:file_info()]} | {error, term()}. +-spec list_files(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id()) -> + {ok, [machi_dt:file_info()]} | {error, term()}. list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try @@ -215,15 +213,15 @@ list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) -> %% @doc Fetch the wedge status from the remote FLU. -spec wedge_status(port_wrap()) -> - {ok, {boolean(), ?T:epoch_id()}} | {error, term()}. + {ok, {boolean(), machi_dt:epoch_id()}} | {error, term()}. wedge_status(Sock) -> wedge_status2(Sock). %% @doc Fetch the wedge status from the remote FLU. --spec wedge_status(?T:inet_host(), ?T:inet_port()) -> - {ok, {boolean(), ?T:epoch_id()}} | {error, term()}. +-spec wedge_status(machi_dt:inet_host(), machi_dt:inet_port()) -> + {ok, {boolean(), machi_dt:epoch_id()}} | {error, term()}. wedge_status(Host, TcpPort) when is_integer(TcpPort) -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try @@ -234,16 +232,16 @@ wedge_status(Host, TcpPort) when is_integer(TcpPort) -> %% @doc Get the latest epoch number + checksum from the FLU's projection store. --spec get_latest_epochid(port_wrap(), ?T:projection_type()) -> - {ok, ?T:epoch_id()} | {error, term()}. +-spec get_latest_epochid(port_wrap(), machi_dt:projection_type()) -> + {ok, machi_dt:epoch_id()} | {error, term()}. get_latest_epochid(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> get_latest_epochid2(Sock, ProjType). %% @doc Get the latest epoch number + checksum from the FLU's projection store. --spec get_latest_epochid(?T:inet_host(), ?T:inet_port(), ?T:projection_type()) -> - {ok, ?T:epoch_id()} | {error, term()}. +-spec get_latest_epochid(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:projection_type()) -> + {ok, machi_dt:epoch_id()} | {error, term()}. get_latest_epochid(Host, TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), @@ -255,17 +253,17 @@ get_latest_epochid(Host, TcpPort, ProjType) %% @doc Get the latest projection from the FLU's projection store for `ProjType' --spec read_latest_projection(port_wrap(), ?T:projection_type()) -> - {ok, ?T:projection()} | {error, not_written} | {error, term()}. +-spec read_latest_projection(port_wrap(), machi_dt:projection_type()) -> + {ok, machi_dt:projection()} | {error, not_written} | {error, term()}. read_latest_projection(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> read_latest_projection2(Sock, ProjType). %% @doc Get the latest projection from the FLU's projection store for `ProjType' --spec read_latest_projection(?T:inet_host(), ?T:inet_port(), - ?T:projection_type()) -> - {ok, ?T:projection()} | {error, not_written} | {error, term()}. +-spec read_latest_projection(machi_dt:inet_host(), machi_dt:inet_port(), + machi_dt:projection_type()) -> + {ok, machi_dt:projection()} | {error, not_written} | {error, term()}. read_latest_projection(Host, TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), @@ -277,17 +275,17 @@ read_latest_projection(Host, TcpPort, ProjType) %% @doc Read a projection `Proj' of type `ProjType'. --spec read_projection(port_wrap(), ?T:projection_type(), ?T:epoch_num()) -> - {ok, ?T:projection()} | {error, not_written} | {error, term()}. +-spec read_projection(port_wrap(), machi_dt:projection_type(), machi_dt:epoch_num()) -> + {ok, machi_dt:projection()} | {error, not_written} | {error, term()}. read_projection(Sock, ProjType, Epoch) when ProjType == 'public' orelse ProjType == 'private' -> read_projection2(Sock, ProjType, Epoch). %% @doc Read a projection `Proj' of type `ProjType'. --spec read_projection(?T:inet_host(), ?T:inet_port(), - ?T:projection_type(), ?T:epoch_num()) -> - {ok, ?T:projection()} | {error, not_written} | {error, term()}. +-spec read_projection(machi_dt:inet_host(), machi_dt:inet_port(), + machi_dt:projection_type(), machi_dt:epoch_num()) -> + {ok, machi_dt:projection()} | {error, not_written} | {error, term()}. read_projection(Host, TcpPort, ProjType, Epoch) when ProjType == 'public' orelse ProjType == 'private' -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), @@ -299,7 +297,7 @@ read_projection(Host, TcpPort, ProjType, Epoch) %% @doc Write a projection `Proj' of type `ProjType'. --spec write_projection(port_wrap(), ?T:projection_type(), ?T:projection()) -> +-spec write_projection(port_wrap(), machi_dt:projection_type(), machi_dt:projection()) -> 'ok' | {error, 'written'} | {error, term()}. write_projection(Sock, ProjType, Proj) when ProjType == 'public' orelse ProjType == 'private', @@ -308,8 +306,8 @@ write_projection(Sock, ProjType, Proj) %% @doc Write a projection `Proj' of type `ProjType'. --spec write_projection(?T:inet_host(), ?T:inet_port(), - ?T:projection_type(), ?T:projection()) -> +-spec write_projection(machi_dt:inet_host(), machi_dt:inet_port(), + machi_dt:projection_type(), machi_dt:projection()) -> 'ok' | {error, 'written'} | {error, term()}. write_projection(Host, TcpPort, ProjType, Proj) when ProjType == 'public' orelse ProjType == 'private', @@ -323,16 +321,16 @@ write_projection(Host, TcpPort, ProjType, Proj) %% @doc Get all projections from the FLU's projection store. --spec get_all_projections(port_wrap(), ?T:projection_type()) -> - {ok, [?T:projection()]} | {error, term()}. +-spec get_all_projections(port_wrap(), machi_dt:projection_type()) -> + {ok, [machi_dt:projection()]} | {error, term()}. get_all_projections(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> get_all_projections2(Sock, ProjType). %% @doc Get all projections from the FLU's projection store. --spec get_all_projections(?T:inet_host(), ?T:inet_port(), ?T:projection_type()) -> - {ok, [?T:projection()]} | {error, term()}. +-spec get_all_projections(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:projection_type()) -> + {ok, [machi_dt:projection()]} | {error, term()}. get_all_projections(Host, TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), @@ -344,7 +342,7 @@ get_all_projections(Host, TcpPort, ProjType) %% @doc Get all epoch numbers from the FLU's projection store. --spec list_all_projections(port_wrap(), ?T:projection_type()) -> +-spec list_all_projections(port_wrap(), machi_dt:projection_type()) -> {ok, [non_neg_integer()]} | {error, term()}. list_all_projections(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> @@ -352,7 +350,7 @@ list_all_projections(Sock, ProjType) %% @doc Get all epoch numbers from the FLU's projection store. --spec list_all_projections(?T:inet_host(), ?T:inet_port(), ?T:projection_type()) -> +-spec list_all_projections(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:projection_type()) -> {ok, [non_neg_integer()]} | {error, term()}. list_all_projections(Host, TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> @@ -394,8 +392,8 @@ disconnect(_) -> %% @doc Restricted API: Write a chunk of already-sequenced data to %% `File' at `Offset'. --spec write_chunk(port_wrap(), ?T:epoch_id(), ?T:file_name(), ?T:file_offset(), ?T:chunk()) -> - ok | {error, ?T:error_general()} | {error, term()}. +-spec write_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk()) -> + ok | {error, machi_dt:error_general()} | {error, term()}. write_chunk(Sock, EpochID, File, Offset, Chunk) when Offset >= ?MINIMUM_OFFSET -> write_chunk2(Sock, EpochID, File, Offset, Chunk). @@ -403,9 +401,9 @@ write_chunk(Sock, EpochID, File, Offset, Chunk) %% @doc Restricted API: Write a chunk of already-sequenced data to %% `File' at `Offset'. --spec write_chunk(?T:inet_host(), ?T:inet_port(), - ?T:epoch_id(), ?T:file_name(), ?T:file_offset(), ?T:chunk()) -> - ok | {error, ?T:error_general()} | {error, term()}. +-spec write_chunk(machi_dt:inet_host(), machi_dt:inet_port(), + machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk()) -> + ok | {error, machi_dt:error_general()} | {error, term()}. write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk) when Offset >= ?MINIMUM_OFFSET -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), @@ -418,16 +416,16 @@ write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk) %% @doc Restricted API: Delete a file after it has been successfully %% migrated. --spec delete_migration(port_wrap(), ?T:epoch_id(), ?T:file_name()) -> - ok | {error, ?T:error_general() | 'no_such_file'} | {error, term()}. +-spec delete_migration(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name()) -> + ok | {error, machi_dt:error_general() | 'no_such_file'} | {error, term()}. delete_migration(Sock, EpochID, File) -> delete_migration2(Sock, EpochID, File). %% @doc Restricted API: Delete a file after it has been successfully %% migrated. --spec delete_migration(?T:inet_host(), ?T:inet_port(), ?T:epoch_id(), ?T:file_name()) -> - ok | {error, ?T:error_general() | 'no_such_file'} | {error, term()}. +-spec delete_migration(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(), machi_dt:file_name()) -> + ok | {error, machi_dt:error_general() | 'no_such_file'} | {error, term()}. delete_migration(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try @@ -439,16 +437,16 @@ delete_migration(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> %% @doc Restricted API: Truncate a file after it has been successfully %% erasure coded. --spec trunc_hack(port_wrap(), ?T:epoch_id(), ?T:file_name()) -> - ok | {error, ?T:error_general() | 'no_such_file'} | {error, term()}. +-spec trunc_hack(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name()) -> + ok | {error, machi_dt:error_general() | 'no_such_file'} | {error, term()}. trunc_hack(Sock, EpochID, File) -> trunc_hack2(Sock, EpochID, File). %% @doc Restricted API: Truncate a file after it has been successfully %% erasure coded. --spec trunc_hack(?T:inet_host(), ?T:inet_port(), ?T:epoch_id(), ?T:file_name()) -> - ok | {error, ?T:error_general() | 'no_such_file'} | {error, term()}. +-spec trunc_hack(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(), machi_dt:file_name()) -> + ok | {error, machi_dt:error_general() | 'no_such_file'} | {error, term()}. trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try