diff --git a/include/machi.hrl b/include/machi.hrl index f825556..6b35205 100644 --- a/include/machi.hrl +++ b/include/machi.hrl @@ -43,3 +43,15 @@ -define(DEFAULT_COC_NAMESPACE, ""). -define(DEFAULT_COC_LOCATOR, 0). + +-record(ns_info, { + version = 0 :: machi_dt:namespace_version(), + name = "" :: machi_dt:namespace(), + locator = 0 :: machi_dt:locator() + }). + +-record(append_opts, { + chunk_extra = 0 :: machi_dt:chunk_size(), + preferred_file_name :: 'undefined' | machi_dt:file_name_s(), + flag_fail_preferred = false :: boolean() + }). diff --git a/src/machi.proto b/src/machi.proto index 2645bde..462be0a 100644 --- a/src/machi.proto +++ b/src/machi.proto @@ -170,12 +170,15 @@ message Mpb_AuthResp { // High level API: append_chunk() request & response message Mpb_AppendChunkReq { - required string coc_namespace = 1; - required uint32 coc_locator = 2; + /* In single chain/non-clustered environment, use namespace="" */ + required string namespace = 1; required string prefix = 3; required bytes chunk = 4; required Mpb_ChunkCSum csum = 5; optional uint32 chunk_extra = 6; + optional string preferred_file_name = 7; + /* Fail the operation if our preferred file name is not available */ + optional uint32 flag_fail_preferred = 8; } message Mpb_AppendChunkResp { @@ -377,14 +380,17 @@ message Mpb_ProjectionV1 { // Low level API: append_chunk() message Mpb_LL_AppendChunkReq { - required Mpb_EpochID epoch_id = 1; - /* To avoid CoC use, use coc_namespace="" and coc_locator=0 */ - required string coc_namespace = 2; - required uint32 coc_locator = 3; - required string prefix = 4; - required bytes chunk = 5; - required Mpb_ChunkCSum csum = 6; - optional uint32 chunk_extra = 7; + required uint32 namespace_version = 1; + required string namespace = 2; + required uint32 locator = 3; + required Mpb_EpochID epoch_id = 4; + required string prefix = 5; + required bytes chunk = 6; + required Mpb_ChunkCSum csum = 7; + optional uint32 chunk_extra = 8; + optional string preferred_file_name = 9; + /* Fail the operation if our preferred file name is not available */ + optional uint32 flag_fail_preferred = 10; } message Mpb_LL_AppendChunkResp { diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index cec7c6a..c45823a 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -118,10 +118,8 @@ %% FLU1 API -export([ %% File API - append_chunk/3, append_chunk/4, - append_chunk/5, append_chunk/6, - append_chunk_extra/4, append_chunk_extra/5, - append_chunk_extra/6, append_chunk_extra/7, + append_chunk/5, + append_chunk/6, append_chunk/7, write_chunk/4, write_chunk/5, read_chunk/5, read_chunk/6, trim_chunk/4, trim_chunk/5, @@ -165,67 +163,27 @@ start_link(P_srvr_list, Opts) -> %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix'. -append_chunk(PidSpec, Prefix, Chunk) -> - append_chunk_extra(PidSpec, ?DEFAULT_COC_NAMESPACE, ?DEFAULT_COC_LOCATOR, - Prefix, Chunk, 0, ?DEFAULT_TIMEOUT). +append_chunk(PidSpec, NSInfo, Prefix, Chunk, CSum) -> + append_chunk(PidSpec, NSInfo, Prefix, Chunk, CSum, #append_opts{}, ?DEFAULT_TIMEOUT). %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix'. -append_chunk(PidSpec, Prefix, Chunk, Timeout) -> - append_chunk_extra(PidSpec, ?DEFAULT_COC_NAMESPACE, ?DEFAULT_COC_LOCATOR, - Prefix, Chunk, 0, Timeout). +append_chunk(PidSpec, NSInfo, Prefix, Chunk, CSum, #append_opts{}=Opts) -> + append_chunk(PidSpec, NSInfo, Prefix, Chunk, CSum, Opts, ?DEFAULT_TIMEOUT). %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix'. -append_chunk(PidSpec, CoC_Namespace, CoC_Locator, Prefix, Chunk) -> - append_chunk_extra(PidSpec, CoC_Namespace, CoC_Locator, - Prefix, Chunk, 0, ?DEFAULT_TIMEOUT). - -%% @doc Append a chunk (binary- or iolist-style) of data to a file -%% with `Prefix'. - -append_chunk(PidSpec, CoC_Namespace, CoC_Locator, Prefix, Chunk, Timeout) -> - append_chunk_extra(PidSpec, CoC_Namespace, CoC_Locator, - Prefix, Chunk, 0, Timeout). - -%% @doc Append a chunk (binary- or iolist-style) of data to a file -%% with `Prefix'. - -append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra) - when is_integer(ChunkExtra), ChunkExtra >= 0 -> - append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra, ?DEFAULT_TIMEOUT). - -%% @doc Append a chunk (binary- or iolist-style) of data to a file -%% with `Prefix'. - -append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra, Timeout0) -> +append_chunk(PidSpec, NSInfo, Prefix, Chunk, CSum, #append_opts{}=Opts, Timeout0) -> + NSInfo2 = machi_util:ns_info_default(NSInfo), {TO, Timeout} = timeout(Timeout0), - gen_server:call(PidSpec, {req, {append_chunk_extra, - ?DEFAULT_COC_NAMESPACE, ?DEFAULT_COC_LOCATOR, - Prefix, - Chunk, ChunkExtra, TO}}, - Timeout). - -append_chunk_extra(PidSpec, CoC_Namespace, CoC_Locator, Prefix, Chunk, ChunkExtra) - when is_integer(ChunkExtra), ChunkExtra >= 0 -> - append_chunk_extra(PidSpec, CoC_Namespace, CoC_Locator, - Prefix, Chunk, ChunkExtra, ?DEFAULT_TIMEOUT). - -%% @doc Append a chunk (binary- or iolist-style) of data to a file -%% with `Prefix'. - -append_chunk_extra(PidSpec, CoC_Namespace, CoC_Locator, - Prefix, Chunk, ChunkExtra, Timeout0) -> - {TO, Timeout} = timeout(Timeout0), - gen_server:call(PidSpec, {req, {append_chunk_extra, - CoC_Namespace, CoC_Locator, Prefix, - Chunk, ChunkExtra, TO}}, + gen_server:call(PidSpec, {req, {append_chunk, + NSInfo2, Prefix, Chunk, CSum, Opts, TO}}, Timeout). %% @doc Write a chunk of data (that has already been -%% allocated/sequenced by an earlier append_chunk_extra() call) to +%% allocated/sequenced by an earlier append_chunk() call) to %% `File' at `Offset'. write_chunk(PidSpec, File, Offset, Chunk) -> @@ -324,10 +282,10 @@ code_change(_OldVsn, S, _Extra) -> %%%%%%%%%%%%%%%%%%%%%%%%%%% -handle_call2({append_chunk_extra, CoC_Namespace, CoC_Locator, - Prefix, Chunk, ChunkExtra, TO}, _From, S) -> - do_append_head(CoC_Namespace, CoC_Locator, Prefix, - Chunk, ChunkExtra, 0, os:timestamp(), TO, S); +handle_call2({append_chunk, NSInfo, + Prefix, Chunk, CSum, Opts, TO}, _From, S) -> + do_append_head(NSInfo, Prefix, + Chunk, CSum, Opts, 0, os:timestamp(), TO, S); handle_call2({write_chunk, File, Offset, Chunk, TO}, _From, S) -> do_write_head(File, Offset, Chunk, 0, os:timestamp(), TO, S); handle_call2({read_chunk, File, Offset, Size, Opts, TO}, _From, S) -> @@ -339,12 +297,12 @@ handle_call2({checksum_list, File, TO}, _From, S) -> handle_call2({list_files, TO}, _From, S) -> do_list_files(0, os:timestamp(), TO, S). -do_append_head(CoC_Namespace, CoC_Locator, Prefix, - Chunk, ChunkExtra, 0=Depth, STime, TO, S) -> - do_append_head2(CoC_Namespace, CoC_Locator, Prefix, - Chunk, ChunkExtra, Depth + 1, STime, TO, S); -do_append_head(CoC_Namespace, CoC_Locator, Prefix, - Chunk, ChunkExtra, Depth, STime, TO, #state{proj=P}=S) -> +do_append_head(NSInfo, Prefix, + Chunk, CSum, Opts, 0=Depth, STime, TO, S) -> + do_append_head2(NSInfo, Prefix, + Chunk, CSum, Opts, Depth + 1, STime, TO, S); +do_append_head(NSInfo, Prefix, + Chunk, CSum, Opts, Depth, STime, TO, #state{proj=P}=S) -> %% io:format(user, "head sleep1,", []), sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, @@ -359,62 +317,61 @@ do_append_head(CoC_Namespace, CoC_Locator, Prefix, case S2#state.proj of P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> - do_append_head(CoC_Namespace, CoC_Locator, Prefix, - Chunk, ChunkExtra, Depth + 1, + do_append_head(NSInfo, Prefix, + Chunk, CSum, Opts, Depth + 1, STime, TO, S2); _ -> - do_append_head2(CoC_Namespace, CoC_Locator, Prefix, - Chunk, ChunkExtra, Depth + 1, + do_append_head2(NSInfo, Prefix, + Chunk, CSum, Opts, Depth + 1, STime, TO, S2) end end. -do_append_head2(CoC_Namespace, CoC_Locator, Prefix, - Chunk, ChunkExtra, Depth, STime, TO, +do_append_head2(NSInfo, Prefix, + Chunk, CSum, Opts, Depth, STime, TO, #state{proj=P}=S) -> [HeadFLU|_RestFLUs] = mutation_flus(P), case is_witness_flu(HeadFLU, P) of true -> case witnesses_use_our_epoch(S) of true -> - do_append_head3(CoC_Namespace, CoC_Locator, Prefix, - Chunk, ChunkExtra, Depth, + do_append_head3(NSInfo, Prefix, + Chunk, CSum, Opts, Depth, STime, TO, S); false -> %% Bummer, go back to the beginning and retry. - do_append_head(CoC_Namespace, CoC_Locator, Prefix, - Chunk, ChunkExtra, Depth, + do_append_head(NSInfo, Prefix, + Chunk, CSum, Opts, Depth, STime, TO, S) end; false -> - do_append_head3(CoC_Namespace, CoC_Locator, Prefix, - Chunk, ChunkExtra, Depth, STime, TO, S) + do_append_head3(NSInfo, Prefix, + Chunk, CSum, Opts, Depth, STime, TO, S) end. -do_append_head3(CoC_Namespace, CoC_Locator, Prefix, - Chunk, ChunkExtra, Depth, STime, TO, +do_append_head3(NSInfo, Prefix, + Chunk, CSum, Opts, Depth, STime, TO, #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> [HeadFLU|RestFLUs] = non_witness_flus(mutation_flus(P), P), Proxy = orddict:fetch(HeadFLU, PD), - case ?FLU_PC:append_chunk_extra(Proxy, EpochID, - CoC_Namespace, CoC_Locator, Prefix, - Chunk, ChunkExtra, ?TIMEOUT) of + case ?FLU_PC:append_chunk(Proxy, NSInfo, EpochID, + Prefix, Chunk, CSum, Opts, ?TIMEOUT) of {ok, {Offset, _Size, File}=_X} -> - do_append_midtail(RestFLUs, CoC_Namespace, CoC_Locator, Prefix, - File, Offset, Chunk, ChunkExtra, + do_append_midtail(RestFLUs, NSInfo, Prefix, + File, Offset, Chunk, CSum, Opts, [HeadFLU], 0, STime, TO, S); {error, bad_checksum}=BadCS -> {reply, BadCS, S}; {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> - do_append_head(CoC_Namespace, CoC_Locator, Prefix, - Chunk, ChunkExtra, Depth, STime, TO, S); + do_append_head(NSInfo, Prefix, + Chunk, CSum, Opts, Depth, STime, TO, S); {error, written} -> %% Implicit sequencing + this error = we don't know where this %% written block is. But we lost a race. Repeat, with a new %% sequencer assignment. - do_append_head(CoC_Namespace, CoC_Locator, Prefix, - Chunk, ChunkExtra, Depth, STime, TO, S); + do_append_head(NSInfo, Prefix, + Chunk, CSum, Opts, Depth, STime, TO, S); {error, trimmed} = Err -> %% TODO: behaviour {reply, Err, S}; @@ -423,15 +380,15 @@ do_append_head3(CoC_Namespace, CoC_Locator, Prefix, Prefix,iolist_size(Chunk)}) end. -do_append_midtail(RestFLUs, CoC_Namespace, CoC_Locator, Prefix, - File, Offset, Chunk, ChunkExtra, +do_append_midtail(RestFLUs, NSInfo, Prefix, + File, Offset, Chunk, CSum, Opts, Ws, Depth, STime, TO, S) when RestFLUs == [] orelse Depth == 0 -> - do_append_midtail2(RestFLUs, CoC_Namespace, CoC_Locator, Prefix, - File, Offset, Chunk, ChunkExtra, + do_append_midtail2(RestFLUs, NSInfo, Prefix, + File, Offset, Chunk, CSum, Opts, Ws, Depth + 1, STime, TO, S); -do_append_midtail(_RestFLUs, CoC_Namespace, CoC_Locator, Prefix, File, - Offset, Chunk, ChunkExtra, +do_append_midtail(_RestFLUs, NSInfo, Prefix, File, + Offset, Chunk, CSum, Opts, Ws, Depth, STime, TO, #state{proj=P}=S) -> %% io:format(user, "midtail sleep2,", []), sleep_a_while(Depth), @@ -458,44 +415,44 @@ do_append_midtail(_RestFLUs, CoC_Namespace, CoC_Locator, Prefix, File, if Prefix == undefined -> % atom! not binary()!! {error, partition}; true -> - do_append_head2(CoC_Namespace, CoC_Locator, - Prefix, Chunk, ChunkExtra, + do_append_head2(NSInfo, + Prefix, Chunk, CSum, Opts, Depth, STime, TO, S2) end; RestFLUs3 -> do_append_midtail2(RestFLUs3, - CoC_Namespace, CoC_Locator, + NSInfo, Prefix, File, Offset, - Chunk, ChunkExtra, + Chunk, CSum, Opts, Ws, Depth + 1, STime, TO, S2) end end end. -do_append_midtail2([], _CoC_Namespace, _CoC_Locator, +do_append_midtail2([], _NSInfo, _Prefix, File, Offset, Chunk, - _ChunkExtra, _Ws, _Depth, _STime, _TO, S) -> + _CSum, _Opts, _Ws, _Depth, _STime, _TO, S) -> %% io:format(user, "ok!\n", []), {reply, {ok, {Offset, chunk_wrapper_size(Chunk), File}}, S}; -do_append_midtail2([FLU|RestFLUs]=FLUs, CoC_Namespace, CoC_Locator, +do_append_midtail2([FLU|RestFLUs]=FLUs, NSInfo, Prefix, File, Offset, Chunk, - ChunkExtra, Ws, Depth, STime, TO, + CSum, Opts, Ws, Depth, STime, TO, #state{epoch_id=EpochID, proxies_dict=PD}=S) -> Proxy = orddict:fetch(FLU, PD), case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of ok -> %% io:format(user, "write ~w,", [FLU]), - do_append_midtail2(RestFLUs, CoC_Namespace, CoC_Locator, Prefix, + do_append_midtail2(RestFLUs, NSInfo, Prefix, File, Offset, Chunk, - ChunkExtra, [FLU|Ws], Depth, STime, TO, S); + CSum, Opts, [FLU|Ws], Depth, STime, TO, S); {error, bad_checksum}=BadCS -> %% TODO: alternate strategy? {reply, BadCS, S}; {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> - do_append_midtail(FLUs, CoC_Namespace, CoC_Locator, Prefix, + do_append_midtail(FLUs, NSInfo, Prefix, File, Offset, Chunk, - ChunkExtra, Ws, Depth, STime, TO, S); + CSum, Opts, Ws, Depth, STime, TO, S); {error, written} -> %% We know what the chunk ought to be, so jump to the %% middle of read-repair. @@ -559,9 +516,10 @@ do_write_head2(File, Offset, Chunk, Depth, STime, TO, ok -> %% From this point onward, we use the same code & logic path as %% append does. - do_append_midtail(RestFLUs, undefined, undefined, undefined, +NSInfo=todo,Prefix=todo,CSum=todo,Opts=todo, + do_append_midtail(RestFLUs, NSInfo, Prefix, File, Offset, Chunk, - undefined, [HeadFLU], 0, STime, TO, S); + CSum, Opts, [HeadFLU], 0, STime, TO, S); {error, bad_checksum}=BadCS -> {reply, BadCS, S}; {error, Retry} diff --git a/src/machi_dt.erl b/src/machi_dt.erl index daf26dd..13e7836 100644 --- a/src/machi_dt.erl +++ b/src/machi_dt.erl @@ -20,8 +20,10 @@ -module(machi_dt). +-include("machi.hrl"). -include("machi_projection.hrl"). +-type append_opts() :: #append_opts{}. -type chunk() :: chunk_bin() | {chunk_csum(), chunk_bin()}. -type chunk_bin() :: binary() | iolist(). % client can use either -type chunk_csum() :: binary(). % 1 byte tag, N-1 bytes checksum @@ -29,9 +31,6 @@ -type chunk_s() :: 'trimmed' | binary(). -type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}. -type chunk_size() :: non_neg_integer(). --type coc_namespace() :: string(). --type coc_nl() :: {coc, coc_namespace(), coc_locator()}. --type coc_locator() :: non_neg_integer(). -type error_general() :: 'bad_arg' | 'wedged' | 'bad_checksum'. -type epoch_csum() :: binary(). -type epoch_num() :: -1 | non_neg_integer(). @@ -44,6 +43,10 @@ -type file_prefix() :: binary() | list(). -type inet_host() :: inet:ip_address() | inet:hostname(). -type inet_port() :: inet:port_number(). +-type locator() :: number(). +-type namespace() :: string(). +-type namespace_version() :: non_neg_integer(). +-type ns_info() :: #ns_info{}. -type projection() :: #projection_v1{}. -type projection_type() :: 'public' | 'private'. @@ -53,6 +56,7 @@ -type csum_tag() :: none | client_sha | server_sha | server_regen_sha. -export_type([ + append_opts/0, chunk/0, chunk_bin/0, chunk_csum/0, @@ -61,9 +65,6 @@ chunk_s/0, chunk_pos/0, chunk_size/0, - coc_namespace/0, - coc_nl/0, - coc_locator/0, error_general/0, epoch_csum/0, epoch_num/0, @@ -76,6 +77,9 @@ file_prefix/0, inet_host/0, inet_port/0, + namespace/0, + namespace_version/0, + ns_info/0, projection/0, projection_type/0 ]). diff --git a/src/machi_flu1_append_server.erl b/src/machi_flu1_append_server.erl index a7b029c..9a41776 100644 --- a/src/machi_flu1_append_server.erl +++ b/src/machi_flu1_append_server.erl @@ -82,25 +82,34 @@ init([Fluname, Witness_p, Wedged_p, EpochId]) -> {ok, #state{flu_name=Fluname, witness=Witness_p, wedged=Wedged_p, etstab=TID, epoch_id=EpochId}}. -handle_call({seq_append, _From2, _N, _L, _Prefix, _Chunk, _CSum, _Extra, _EpochID}, +handle_call({seq_append, _From2, _NSInfo, _EpochID, _Prefix, _Chunk, _TCSum, _Opts}, _From, #state{witness=true}=S) -> %% The FLU's machi_flu1_net_server process ought to filter all %% witness states, but we'll keep this clause for extra %% paranoia. {reply, witness, S}; -handle_call({seq_append, _From2, _N, _L, _Prefix, _Chunk, _CSum, _Extra, _EpochID}, +handle_call({seq_append, _From2, _NSInfo, _EpochID, _Prefix, _Chunk, _TCSum, _Opts}, _From, #state{wedged=true}=S) -> {reply, wedged, S}; -handle_call({seq_append, _From2, CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum, Extra, EpochID}, +handle_call({seq_append, _From2, NSInfo, EpochID, + Prefix, Chunk, TCSum, Opts}, From, #state{flu_name=FluName, epoch_id=OldEpochId}=S) -> + %% io:format(user, " + %% HANDLE_CALL append_chunk + %% NSInfo=~p + %% epoch_id=~p + %% prefix=~p + %% chunk=~p + %% tcsum=~p + %% opts=~p\n", + %% [NSInfo, EpochID, Prefix, Chunk, TCSum, Opts]), %% Old is the one from our state, plain old 'EpochID' comes %% from the client. _ = case OldEpochId of EpochID -> spawn(fun() -> - append_server_dispatch(From, CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum, Extra, + append_server_dispatch(From, NSInfo, + Prefix, Chunk, TCSum, Opts, FluName, EpochID) end), {noreply, S}; @@ -161,10 +170,10 @@ terminate(Reason, _S) -> code_change(_OldVsn, S, _Extra) -> {ok, S}. -append_server_dispatch(From, CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum, Extra, FluName, EpochId) -> - Result = case handle_append(CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum, Extra, FluName, EpochId) of +append_server_dispatch(From, NSInfo, + Prefix, Chunk, TCSum, Opts, FluName, EpochId) -> + Result = case handle_append(NSInfo, + Prefix, Chunk, TCSum, Opts, FluName, EpochId) of {ok, File, Offset} -> {assignment, Offset, File}; Other -> @@ -173,19 +182,17 @@ append_server_dispatch(From, CoC_Namespace, CoC_Locator, _ = gen_server:reply(From, Result), ok. -handle_append(_N, _L, _Prefix, <<>>, _Csum, _Extra, _FluName, _EpochId) -> - {error, bad_arg}; -handle_append(CoC_Namespace, CoC_Locator, - Prefix, Chunk, Csum, Extra, FluName, EpochId) -> - CoC = {coc, CoC_Namespace, CoC_Locator}, +handle_append(NSInfo, + Prefix, Chunk, TCSum, Opts, FluName, EpochId) -> Res = machi_flu_filename_mgr:find_or_make_filename_from_prefix( - FluName, EpochId, {prefix, Prefix}, CoC), + FluName, EpochId, {prefix, Prefix}, NSInfo), case Res of {file, F} -> case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, F}) of {ok, Pid} -> - {Tag, CS} = machi_util:unmake_tagged_csum(Csum), + {Tag, CS} = machi_util:unmake_tagged_csum(TCSum), Meta = [{client_csum_tag, Tag}, {client_csum, CS}], + Extra = Opts#append_opts.chunk_extra, machi_file_proxy:append(Pid, Meta, Extra, Chunk); {error, trimmed} = E -> E diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index e5b65fc..10c833e 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -50,14 +50,13 @@ -include_lib("pulse_otp/include/pulse_otp.hrl"). -endif. --define(HARD_TIMEOUT, 2500). +-define(SHORT_TIMEOUT, 2500). +-define(LONG_TIMEOUT, (60*1000)). -export([ %% File API - append_chunk/4, append_chunk/5, append_chunk/6, append_chunk/7, - append_chunk_extra/5, append_chunk_extra/6, - append_chunk_extra/7, append_chunk_extra/8, + append_chunk/8, append_chunk/9, read_chunk/6, read_chunk/7, checksum_list/3, checksum_list/4, list_files/2, list_files/3, @@ -89,142 +88,61 @@ -type port_wrap() :: {w,atom(),term()}. -%% @doc Append a chunk (binary- or iolist-style) of data to a file -%% with `Prefix'. - --spec append_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_prefix(), machi_dt:chunk()) -> +-spec append_chunk(port_wrap(), + machi_dt:ns_info(), machi_dt:epoch_id(), + machi_dt:file_prefix(), machi_dt:chunk(), + machi_dt:chunk_csum()) -> {ok, machi_dt:chunk_pos()} | {error, machi_dt:error_general()} | {error, term()}. -append_chunk(Sock, EpochID, Prefix, Chunk) -> - append_chunk2(Sock, EpochID, - ?DEFAULT_COC_NAMESPACE, ?DEFAULT_COC_LOCATOR, - Prefix, Chunk, 0). +append_chunk(Sock, NSInfo, EpochID, Prefix, Chunk, CSum) -> + append_chunk(Sock, NSInfo, EpochID, Prefix, Chunk, CSum, + #append_opts{}, ?LONG_TIMEOUT). %% @doc Append a chunk (binary- or iolist-style) of data to a file -%% with `Prefix'. +%% with `Prefix' and also request an additional `Extra' bytes. +%% +%% For example, if the `Chunk' size is 1 KByte and `Extra' is 4K Bytes, then +%% the file offsets that follow `Chunk''s position for the following 4K will +%% be reserved by the file sequencer for later write(s) by the +%% `write_chunk()' API. -spec append_chunk(machi_dt:inet_host(), machi_dt:inet_port(), - machi_dt:epoch_id(), machi_dt:file_prefix(), machi_dt:chunk()) -> + machi_dt:ns_info(), machi_dt:epoch_id(), + machi_dt:file_prefix(), machi_dt:chunk(), + machi_dt:chunk_csum()) -> {ok, machi_dt:chunk_pos()} | {error, machi_dt:error_general()} | {error, term()}. -append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) -> - Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), - try - append_chunk2(Sock, EpochID, - ?DEFAULT_COC_NAMESPACE, ?DEFAULT_COC_LOCATOR, - Prefix, Chunk, 0) - after - disconnect(Sock) - end. +append_chunk(Host, TcpPort, NSInfo, EpochID, Prefix, Chunk, CSum) -> + append_chunk(Host, TcpPort, NSInfo, EpochID, Prefix, Chunk, CSum, + #append_opts{}, ?LONG_TIMEOUT). + +-spec append_chunk(port_wrap(), + machi_dt:ns_info(), machi_dt:epoch_id(), + machi_dt:file_prefix(), machi_dt:chunk(), + machi_dt:chunk_csum(), machi_dt:append_opts(), timeout()) -> + {ok, machi_dt:chunk_pos()} | {error, machi_dt:error_general()} | {error, term()}. +append_chunk(Sock, NSInfo0, EpochID, Prefix, Chunk, CSum, Opts, Timeout) -> + NSInfo = machi_util:ns_info_default(NSInfo0), + append_chunk2(Sock, NSInfo, EpochID, Prefix, Chunk, CSum, Opts, Timeout). %% @doc Append a chunk (binary- or iolist-style) of data to a file -%% with `Prefix'. - --spec append_chunk(port_wrap(), machi_dt:epoch_id(), - machi_dt:coc_namespace(), machi_dt:coc_locator(), - machi_dt:file_prefix(), machi_dt:chunk()) -> - {ok, machi_dt:chunk_pos()} | {error, machi_dt:error_general()} | {error, term()}. -append_chunk(Sock, EpochID, CoC_Namespace, CoC_Locator, Prefix, Chunk) -> - append_chunk2(Sock, EpochID, - CoC_Namespace, CoC_Locator, - Prefix, Chunk, 0). - -%% @doc Append a chunk (binary- or iolist-style) of data to a file -%% with `Prefix'. +%% with `Prefix' and also request an additional `Extra' bytes. +%% +%% For example, if the `Chunk' size is 1 KByte and `Extra' is 4K Bytes, then +%% the file offsets that follow `Chunk''s position for the following 4K will +%% be reserved by the file sequencer for later write(s) by the +%% `write_chunk()' API. -spec append_chunk(machi_dt:inet_host(), machi_dt:inet_port(), - machi_dt:epoch_id(), - machi_dt:coc_namespace(), machi_dt:coc_locator(), - machi_dt:file_prefix(), machi_dt:chunk()) -> + machi_dt:ns_info(), machi_dt:epoch_id(), + machi_dt:file_prefix(), machi_dt:chunk(), + machi_dt:chunk_csum(), machi_dt:append_opts(), timeout()) -> {ok, machi_dt:chunk_pos()} | {error, machi_dt:error_general()} | {error, term()}. -append_chunk(Host, TcpPort, EpochID, CoC_Namespace, CoC_Locator, Prefix, Chunk) -> +append_chunk(Host, TcpPort, NSInfo0, EpochID, + Prefix, Chunk, CSum, Opts, Timeout) -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try - append_chunk2(Sock, EpochID, - CoC_Namespace, CoC_Locator, - Prefix, Chunk, 0) - after - disconnect(Sock) - end. - -%% @doc Append a chunk (binary- or iolist-style) of data to a file -%% with `Prefix' and also request an additional `Extra' bytes. -%% -%% For example, if the `Chunk' size is 1 KByte and `Extra' is 4K Bytes, then -%% the file offsets that follow `Chunk''s position for the following 4K will -%% be reserved by the file sequencer for later write(s) by the -%% `write_chunk()' API. - --spec append_chunk_extra(port_wrap(), machi_dt:epoch_id(), - machi_dt:file_prefix(), machi_dt:chunk(), machi_dt:chunk_size()) -> - {ok, machi_dt:chunk_pos()} | {error, machi_dt:error_general()} | {error, term()}. -append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra) - when is_integer(ChunkExtra), ChunkExtra >= 0 -> - append_chunk2(Sock, EpochID, - ?DEFAULT_COC_NAMESPACE, ?DEFAULT_COC_LOCATOR, - Prefix, Chunk, ChunkExtra). - -%% @doc Append a chunk (binary- or iolist-style) of data to a file -%% with `Prefix' and also request an additional `Extra' bytes. -%% -%% For example, if the `Chunk' size is 1 KByte and `Extra' is 4K Bytes, then -%% the file offsets that follow `Chunk''s position for the following 4K will -%% be reserved by the file sequencer for later write(s) by the -%% `write_chunk()' API. - --spec append_chunk_extra(machi_dt:inet_host(), machi_dt:inet_port(), - machi_dt:epoch_id(), machi_dt:file_prefix(), machi_dt:chunk(), machi_dt:chunk_size()) -> - {ok, machi_dt:chunk_pos()} | {error, machi_dt:error_general()} | {error, term()}. -append_chunk_extra(Host, TcpPort, EpochID, Prefix, Chunk, ChunkExtra) - when is_integer(ChunkExtra), ChunkExtra >= 0 -> - Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), - try - append_chunk2(Sock, EpochID, - ?DEFAULT_COC_NAMESPACE, ?DEFAULT_COC_LOCATOR, - Prefix, Chunk, ChunkExtra) - after - disconnect(Sock) - end. - -%% @doc Append a chunk (binary- or iolist-style) of data to a file -%% with `Prefix' and also request an additional `Extra' bytes. -%% -%% For example, if the `Chunk' size is 1 KByte and `Extra' is 4K Bytes, then -%% the file offsets that follow `Chunk''s position for the following 4K will -%% be reserved by the file sequencer for later write(s) by the -%% `write_chunk()' API. - --spec append_chunk_extra(port_wrap(), machi_dt:epoch_id(), - machi_dt:coc_namespace(), machi_dt:coc_locator(), - machi_dt:file_prefix(), machi_dt:chunk(), machi_dt:chunk_size()) -> - {ok, machi_dt:chunk_pos()} | {error, machi_dt:error_general()} | {error, term()}. -append_chunk_extra(Sock, EpochID, CoC_Namespace, CoC_Locator, - Prefix, Chunk, ChunkExtra) - when is_integer(ChunkExtra), ChunkExtra >= 0 -> - append_chunk2(Sock, EpochID, - CoC_Namespace, CoC_Locator, - Prefix, Chunk, ChunkExtra). - -%% @doc Append a chunk (binary- or iolist-style) of data to a file -%% with `Prefix' and also request an additional `Extra' bytes. -%% -%% For example, if the `Chunk' size is 1 KByte and `Extra' is 4K Bytes, then -%% the file offsets that follow `Chunk''s position for the following 4K will -%% be reserved by the file sequencer for later write(s) by the -%% `write_chunk()' API. - --spec append_chunk_extra(machi_dt:inet_host(), machi_dt:inet_port(), - machi_dt:epoch_id(), - machi_dt:coc_namespace(), machi_dt:coc_locator(), - machi_dt:file_prefix(), machi_dt:chunk(), machi_dt:chunk_size()) -> - {ok, machi_dt:chunk_pos()} | {error, machi_dt:error_general()} | {error, term()}. -append_chunk_extra(Host, TcpPort, EpochID, - CoC_Namespace, CoC_Locator, - Prefix, Chunk, ChunkExtra) - when is_integer(ChunkExtra), ChunkExtra >= 0 -> - Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), - try - append_chunk2(Sock, EpochID, - CoC_Namespace, CoC_Locator, - Prefix, Chunk, ChunkExtra) + NSInfo = machi_util:ns_info_default(NSInfo0), + append_chunk2(Sock, NSInfo, EpochID, + Prefix, Chunk, CSum, Opts, Timeout) after disconnect(Sock) end. @@ -628,23 +546,24 @@ read_chunk2(Sock, EpochID, File0, Offset, Size, Opts) -> {low_read_chunk, EpochID, File, Offset, Size, Opts}), do_pb_request_common(Sock, ReqID, Req). -append_chunk2(Sock, EpochID, CoC_Namespace, CoC_Locator, - Prefix0, Chunk0, ChunkExtra) -> +append_chunk2(Sock, NSInfo, EpochID, + Prefix0, Chunk, CSum0, Opts, Timeout) -> ReqID = <<"id">>, - {Chunk, CSum_tag, CSum} = - case Chunk0 of - X when is_binary(X) -> - {Chunk0, ?CSUM_TAG_NONE, <<>>}; - {ChunkCSum, Chk} -> - {Tag, CS} = machi_util:unmake_tagged_csum(ChunkCSum), - {Chk, Tag, CS} - end, Prefix = machi_util:make_binary(Prefix0), + {CSum_tag, CSum} = case CSum0 of + <<>> -> + {?CSUM_TAG_NONE, <<>>}; + {_Tag, _CS} -> + CSum0; + B when is_binary(B) -> + machi_util:unmake_tagged_csum(CSum0) + end, + #ns_info{version=NSVersion, name=NS, locator=NSLocator} = NSInfo, Req = machi_pb_translate:to_pb_request( ReqID, - {low_append_chunk, EpochID, CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum_tag, CSum, ChunkExtra}), - do_pb_request_common(Sock, ReqID, Req). + {low_append_chunk, NSVersion, NS, NSLocator, EpochID, + Prefix, Chunk, CSum_tag, CSum, Opts}), + do_pb_request_common(Sock, ReqID, Req, true, Timeout). write_chunk2(Sock, EpochID, File0, Offset, Chunk0) -> ReqID = <<"id">>, @@ -739,18 +658,18 @@ kick_projection_reaction2(Sock, _Options) -> ReqID = <<42>>, Req = machi_pb_translate:to_pb_request( ReqID, {low_proj, {kick_projection_reaction}}), - do_pb_request_common(Sock, ReqID, Req, false). + do_pb_request_common(Sock, ReqID, Req, false, ?LONG_TIMEOUT). do_pb_request_common(Sock, ReqID, Req) -> - do_pb_request_common(Sock, ReqID, Req, true). + do_pb_request_common(Sock, ReqID, Req, true, ?LONG_TIMEOUT). -do_pb_request_common(Sock, ReqID, Req, GetReply_p) -> +do_pb_request_common(Sock, ReqID, Req, GetReply_p, Timeout) -> erase(bad_sock), try ReqBin = list_to_binary(machi_pb:encode_mpb_ll_request(Req)), ok = w_send(Sock, ReqBin), if GetReply_p -> - case w_recv(Sock, 0) of + case w_recv(Sock, 0, Timeout) of {ok, RespBin} -> Resp = machi_pb:decode_mpb_ll_response(RespBin), {ReqID2, Reply} = machi_pb_translate:from_pb_response(Resp), @@ -796,7 +715,7 @@ w_connect(#p_srvr{proto_mod=?MODULE, address=Host, port=Port, props=Props}=_P)-> case proplists:get_value(session_proto, Props, tcp) of tcp -> put(xxx, goofus), - Sock = machi_util:connect(Host, Port, ?HARD_TIMEOUT), + Sock = machi_util:connect(Host, Port, ?SHORT_TIMEOUT), put(xxx, Sock), ok = inet:setopts(Sock, ?PB_PACKET_OPTS), {w,tcp,Sock}; @@ -820,8 +739,8 @@ w_close({w,tcp,Sock}) -> catch gen_tcp:close(Sock), ok. -w_recv({w,tcp,Sock}, Amt) -> - gen_tcp:recv(Sock, Amt, ?HARD_TIMEOUT). +w_recv({w,tcp,Sock}, Amt, Timeout) -> + gen_tcp:recv(Sock, Amt, Timeout). w_send({w,tcp,Sock}, IoData) -> gen_tcp:send(Sock, IoData). diff --git a/src/machi_flu1_net_server.erl b/src/machi_flu1_net_server.erl index 6610230..588b8d8 100644 --- a/src/machi_flu1_net_server.erl +++ b/src/machi_flu1_net_server.erl @@ -264,13 +264,24 @@ do_pb_ll_request3({low_proj, PCMD}, S) -> {do_server_proj_request(PCMD, S), S}; %% Witness status *matters* below -do_pb_ll_request3({low_append_chunk, _EpochID, CoC_Namespace, CoC_Locator, +do_pb_ll_request3({low_append_chunk, NSVersion, NS, NSLocator, EpochID, Prefix, Chunk, CSum_tag, - CSum, ChunkExtra}, + CSum, Opts}, #state{witness=false}=S) -> - {do_server_append_chunk(CoC_Namespace, CoC_Locator, + %% io:format(user, " + %% append_chunk namespace_version=~p + %% namespace=~p + %% locator=~p + %% epoch_id=~p + %% prefix=~p + %% chunk=~p + %% csum={~p,~p} + %% opts=~p\n", + %% [NSVersion, NS, NSLocator, EpochID, Prefix, Chunk, CSum_tag, CSum, Opts]), + NSInfo = #ns_info{version=NSVersion, name=NS, locator=NSLocator}, + {do_server_append_chunk(NSInfo, EpochID, Prefix, Chunk, CSum_tag, CSum, - ChunkExtra, S), S}; + Opts, S), S}; do_pb_ll_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag, CSum}, #state{witness=false}=S) -> @@ -334,27 +345,27 @@ do_server_proj_request({kick_projection_reaction}, end), async_no_response. -do_server_append_chunk(CoC_Namespace, CoC_Locator, +do_server_append_chunk(NSInfo, EpochID, Prefix, Chunk, CSum_tag, CSum, - ChunkExtra, S) -> + Opts, S) -> case sanitize_prefix(Prefix) of ok -> - do_server_append_chunk2(CoC_Namespace, CoC_Locator, + do_server_append_chunk2(NSInfo, EpochID, Prefix, Chunk, CSum_tag, CSum, - ChunkExtra, S); + Opts, S); _ -> {error, bad_arg} end. -do_server_append_chunk2(CoC_Namespace, CoC_Locator, +do_server_append_chunk2(NSInfo, EpochID, Prefix, Chunk, CSum_tag, Client_CSum, - ChunkExtra, #state{flu_name=FluName, - epoch_id=EpochID}=_S) -> + Opts, #state{flu_name=FluName, + epoch_id=EpochID}=_S) -> %% TODO: Do anything with PKey? try TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk), - R = {seq_append, self(), CoC_Namespace, CoC_Locator, - Prefix, Chunk, TaggedCSum, ChunkExtra, EpochID}, + R = {seq_append, self(), NSInfo, EpochID, + Prefix, Chunk, TaggedCSum, Opts}, case gen_server:call(FluName, R, 10*1000) of {assignment, Offset, File} -> Size = iolist_size(Chunk), @@ -563,13 +574,11 @@ do_pb_hl_request2({high_echo, Msg}, S) -> {Msg, S}; do_pb_hl_request2({high_auth, _User, _Pass}, S) -> {-77, S}; -do_pb_hl_request2({high_append_chunk, CoC_Namespace, CoC_Locator, - Prefix, ChunkBin, TaggedCSum, - ChunkExtra}, #state{high_clnt=Clnt}=S) -> - Chunk = {TaggedCSum, ChunkBin}, - Res = machi_cr_client:append_chunk_extra(Clnt, CoC_Namespace, CoC_Locator, - Prefix, Chunk, - ChunkExtra), +do_pb_hl_request2({high_append_chunk, NS, Prefix, Chunk, TaggedCSum, Opts}, + #state{high_clnt=Clnt}=S) -> + NSInfo = #ns_info{name=NS}, % TODO populate other fields + Res = machi_cr_client:append_chunk(Clnt, NSInfo, + Prefix, Chunk, TaggedCSum, Opts), {Res, S}; do_pb_hl_request2({high_write_chunk, File, Offset, ChunkBin, TaggedCSum}, #state{high_clnt=Clnt}=S) -> diff --git a/src/machi_flu_filename_mgr.erl b/src/machi_flu_filename_mgr.erl index 293fdc3..7140266 100644 --- a/src/machi_flu_filename_mgr.erl +++ b/src/machi_flu_filename_mgr.erl @@ -67,6 +67,7 @@ ]). -define(TIMEOUT, 10 * 1000). +-include("machi.hrl"). %% included for #ns_info record -include("machi_projection.hrl"). %% included for pv1_epoch type -record(state, {fluname :: atom(), @@ -90,28 +91,28 @@ start_link(FluName, DataDir) when is_atom(FluName) andalso is_list(DataDir) -> -spec find_or_make_filename_from_prefix( FluName :: atom(), EpochId :: pv1_epoch(), Prefix :: {prefix, string()}, - machi_dt:coc_nl()) -> + machi_dt:ns_info()) -> {file, Filename :: string()} | {error, Reason :: term() } | timeout. % @doc Find the latest available or make a filename from a prefix. A prefix % should be in the form of a tagged tuple `{prefix, P}'. Returns a tagged % tuple in the form of `{file, F}' or an `{error, Reason}' find_or_make_filename_from_prefix(FluName, EpochId, {prefix, Prefix}, - {coc, _CoC_Ns, _CoC_Loc}=CoC_NL) + #ns_info{}=NSInfo) when is_atom(FluName) -> N = make_filename_mgr_name(FluName), - gen_server:call(N, {find_filename, EpochId, CoC_NL, Prefix}, ?TIMEOUT); + gen_server:call(N, {find_filename, EpochId, NSInfo, Prefix}, ?TIMEOUT); find_or_make_filename_from_prefix(_FluName, _EpochId, Other, Other2) -> - lager:error("~p is not a valid prefix/CoC ~p", [Other, Other2]), + lager:error("~p is not a valid prefix/locator ~p", [Other, Other2]), error(badarg). --spec increment_prefix_sequence( FluName :: atom(), CoC_NL :: machi_dt:coc_nl(), Prefix :: {prefix, string()} ) -> +-spec increment_prefix_sequence( FluName :: atom(), NSInfo :: machi_dt:ns_info(), Prefix :: {prefix, string()} ) -> ok | {error, Reason :: term() } | timeout. % @doc Increment the sequence counter for a given prefix. Prefix should % be in the form of `{prefix, P}'. -increment_prefix_sequence(FluName, {coc,_CoC_Namespace,_CoC_Locator}=CoC_NL, {prefix, Prefix}) when is_atom(FluName) -> - gen_server:call(make_filename_mgr_name(FluName), {increment_sequence, CoC_NL, Prefix}, ?TIMEOUT); -increment_prefix_sequence(_FluName, _CoC_NL, Other) -> +increment_prefix_sequence(FluName, #ns_info{}=NSInfo, {prefix, Prefix}) when is_atom(FluName) -> + gen_server:call(make_filename_mgr_name(FluName), {increment_sequence, NSInfo, Prefix}, ?TIMEOUT); +increment_prefix_sequence(_FluName, _NSInfo, Other) -> lager:error("~p is not a valid prefix.", [Other]), error(badarg). @@ -142,23 +143,23 @@ handle_cast(Req, State) -> %% the FLU has already validated that the caller's epoch id and the FLU's epoch id %% are the same. So we *assume* that remains the case here - that is to say, we %% are not wedged. -handle_call({find_filename, EpochId, CoC_NL, Prefix}, _From, S = #state{ datadir = DataDir, +handle_call({find_filename, EpochId, NSInfo, Prefix}, _From, S = #state{ datadir = DataDir, epoch = EpochId, tid = Tid }) -> %% Our state and the caller's epoch ids are the same. Business as usual. - File = handle_find_file(Tid, CoC_NL, Prefix, DataDir), + File = handle_find_file(Tid, NSInfo, Prefix, DataDir), {reply, {file, File}, S}; -handle_call({find_filename, EpochId, CoC_NL, Prefix}, _From, S = #state{ datadir = DataDir, tid = Tid }) -> +handle_call({find_filename, EpochId, NSInfo, Prefix}, _From, S = #state{ datadir = DataDir, tid = Tid }) -> %% If the epoch id in our state and the caller's epoch id were the same, it would've %% matched the above clause. Since we're here, we know that they are different. %% If epoch ids between our state and the caller's are different, we must increment the %% sequence number, generate a filename and then cache it. - File = increment_and_cache_filename(Tid, DataDir, CoC_NL, Prefix), + File = increment_and_cache_filename(Tid, DataDir, NSInfo, Prefix), {reply, {file, File}, S#state{epoch = EpochId}}; -handle_call({increment_sequence, {coc,CoC_Namespace,CoC_Locator}=_CoC_NL, Prefix}, _From, S = #state{ datadir = DataDir }) -> - ok = machi_util:increment_max_filenum(DataDir, CoC_Namespace,CoC_Locator, Prefix), +handle_call({increment_sequence, #ns_info{name=NS, locator=NSLocator}, Prefix}, _From, S = #state{ datadir = DataDir }) -> + ok = machi_util:increment_max_filenum(DataDir, NS, NSLocator, Prefix), {reply, ok, S}; handle_call({list_files, Prefix}, From, S = #state{ datadir = DataDir }) -> spawn(fun() -> @@ -191,9 +192,9 @@ generate_uuid_v4_str() -> io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b", [A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]). -find_file(DataDir, {coc,CoC_Namespace,CoC_Locator}=_CoC_NL, Prefix, N) -> +find_file(DataDir, #ns_info{name=NS, locator=NSLocator}=_NSInfo, Prefix, N) -> {_Filename, Path} = machi_util:make_data_filename(DataDir, - CoC_Namespace,CoC_Locator, + NS, NSLocator, Prefix, "*", N), filelib:wildcard(Path). @@ -204,11 +205,11 @@ list_files(DataDir, Prefix) -> make_filename_mgr_name(FluName) when is_atom(FluName) -> list_to_atom(atom_to_list(FluName) ++ "_filename_mgr"). -handle_find_file(Tid, {coc,CoC_Namespace,CoC_Locator}=CoC_NL, Prefix, DataDir) -> - N = machi_util:read_max_filenum(DataDir, CoC_Namespace, CoC_Locator, Prefix), - {File, Cleanup} = case find_file(DataDir, CoC_NL, Prefix, N) of +handle_find_file(Tid, #ns_info{name=NS, locator=NSLocator}=NSInfo, Prefix, DataDir) -> + N = machi_util:read_max_filenum(DataDir, NS, NSLocator, Prefix), + {File, Cleanup} = case find_file(DataDir, NSInfo, Prefix, N) of [] -> - {find_or_make_filename(Tid, DataDir, CoC_Namespace, CoC_Locator, Prefix, N), false}; + {find_or_make_filename(Tid, DataDir, NS, NSLocator, Prefix, N), false}; [H] -> {H, true}; [Fn | _ ] = L -> lager:debug( @@ -216,23 +217,23 @@ handle_find_file(Tid, {coc,CoC_Namespace,CoC_Locator}=CoC_NL, Prefix, DataDir) - [Prefix, N, L]), {Fn, true} end, - maybe_cleanup(Tid, {CoC_Namespace, CoC_Locator, Prefix, N}, Cleanup), + maybe_cleanup(Tid, {NS, NSLocator, Prefix, N}, Cleanup), filename:basename(File). -find_or_make_filename(Tid, DataDir, CoC_Namespace, CoC_Locator, Prefix, N) -> - case ets:lookup(Tid, {CoC_Namespace, CoC_Locator, Prefix, N}) of +find_or_make_filename(Tid, DataDir, NS, NSLocator, Prefix, N) -> + case ets:lookup(Tid, {NS, NSLocator, Prefix, N}) of [] -> - F = generate_filename(DataDir, CoC_Namespace, CoC_Locator, Prefix, N), - true = ets:insert_new(Tid, {{CoC_Namespace, CoC_Locator, Prefix, N}, F}), + F = generate_filename(DataDir, NS, NSLocator, Prefix, N), + true = ets:insert_new(Tid, {{NS, NSLocator, Prefix, N}, F}), F; [{_Key, File}] -> File end. -generate_filename(DataDir, CoC_Namespace, CoC_Locator, Prefix, N) -> +generate_filename(DataDir, NS, NSLocator, Prefix, N) -> {F, _} = machi_util:make_data_filename( DataDir, - CoC_Namespace, CoC_Locator, Prefix, + NS, NSLocator, Prefix, generate_uuid_v4_str(), N), binary_to_list(F). @@ -242,11 +243,11 @@ maybe_cleanup(_Tid, _Key, false) -> maybe_cleanup(Tid, Key, true) -> true = ets:delete(Tid, Key). -increment_and_cache_filename(Tid, DataDir, {coc,CoC_Namespace,CoC_Locator}, Prefix) -> - ok = machi_util:increment_max_filenum(DataDir, CoC_Namespace, CoC_Locator, Prefix), - N = machi_util:read_max_filenum(DataDir, CoC_Namespace, CoC_Locator, Prefix), - F = generate_filename(DataDir, CoC_Namespace, CoC_Locator, Prefix, N), - true = ets:insert_new(Tid, {{CoC_Namespace, CoC_Locator, Prefix, N}, F}), +increment_and_cache_filename(Tid, DataDir, #ns_info{name=NS,locator=NSLocator}, Prefix) -> + ok = machi_util:increment_max_filenum(DataDir, NS, NSLocator, Prefix), + N = machi_util:read_max_filenum(DataDir, NS, NSLocator, Prefix), + F = generate_filename(DataDir, NS, NSLocator, Prefix, N), + true = ets:insert_new(Tid, {{NS, NSLocator, Prefix, N}, F}), filename:basename(F). diff --git a/src/machi_pb_high_client.erl b/src/machi_pb_high_client.erl index 5b2ab22..9c69358 100644 --- a/src/machi_pb_high_client.erl +++ b/src/machi_pb_high_client.erl @@ -38,7 +38,7 @@ connected_p/1, echo/2, echo/3, auth/3, auth/4, - append_chunk/7, append_chunk/8, + append_chunk/6, append_chunk/7, write_chunk/5, write_chunk/6, read_chunk/5, read_chunk/6, trim_chunk/4, trim_chunk/5, @@ -96,30 +96,33 @@ auth(PidSpec, User, Pass) -> auth(PidSpec, User, Pass, Timeout) -> send_sync(PidSpec, {auth, User, Pass}, Timeout). --spec append_chunk(pid(), CoC_namespace::binary(), CoC_locator::integer(), Prefix::binary(), Chunk::binary(), - CSum::binary(), ChunkExtra::non_neg_integer()) -> +-spec append_chunk(pid(), + NS::machi_dt:namespace(), Prefix::machi_dt:file_prefix(), + Chunk::machi_dt:chunk_bin(), CSum::machi_dt:chunk_csum(), + Opts::machi_dt:append_opts()) -> {ok, Filename::string(), Offset::machi_dt:file_offset()} | {error, machi_client_error_reason()}. -append_chunk(PidSpec, CoC_namespace, CoC_locator, Prefix, Chunk, CSum, ChunkExtra) -> - append_chunk(PidSpec, CoC_namespace, CoC_locator, Prefix, Chunk, CSum, ChunkExtra, ?DEFAULT_TIMEOUT). +append_chunk(PidSpec, NS, Prefix, Chunk, CSum, Opts) -> + append_chunk(PidSpec, NS, Prefix, Chunk, CSum, Opts, ?DEFAULT_TIMEOUT). --spec append_chunk(pid(), CoC_namespace::binary(), CoC_locator::integer(), Prefix::binary(), - Chunk::binary(), CSum::binary(), - ChunkExtra::non_neg_integer(), +-spec append_chunk(pid(), + NS::machi_dt:namespace(), Prefix::machi_dt:file_prefix(), + Chunk::machi_dt:chunk_bin(), CSum::machi_dt:chunk_csum(), + Opts::machi_dt:append_opts(), Timeout::non_neg_integer()) -> {ok, Filename::string(), Offset::machi_dt:file_offset()} | {error, machi_client_error_reason()}. -append_chunk(PidSpec, CoC_namespace, CoC_locator, Prefix, Chunk, CSum, ChunkExtra, Timeout) -> - send_sync(PidSpec, {append_chunk, CoC_namespace, CoC_locator, Prefix, Chunk, CSum, ChunkExtra}, Timeout). +append_chunk(PidSpec, NS, Prefix, Chunk, CSum, Opts, Timeout) -> + send_sync(PidSpec, {append_chunk, NS, Prefix, Chunk, CSum, Opts}, Timeout). -spec write_chunk(pid(), File::string(), machi_dt:file_offset(), - Chunk::binary(), CSum::binary()) -> + Chunk::machi_dt:chunk_bin(), CSum::machi_dt:chunk_csum()) -> ok | {error, machi_client_error_reason()}. write_chunk(PidSpec, File, Offset, Chunk, CSum) -> write_chunk(PidSpec, File, Offset, Chunk, CSum, ?DEFAULT_TIMEOUT). -spec write_chunk(pid(), File::string(), machi_dt:file_offset(), - Chunk::binary(), CSum::binary(), Timeout::non_neg_integer()) -> + Chunk::machi_dt:chunk_bin(), CSum::machi_dt:chunk_csum(), Timeout::non_neg_integer()) -> ok | {error, machi_client_error_reason()}. write_chunk(PidSpec, File, Offset, Chunk, CSum, Timeout) -> send_sync(PidSpec, {write_chunk, File, Offset, Chunk, CSum}, Timeout). @@ -281,18 +284,19 @@ do_send_sync2({auth, User, Pass}, #state{sock=Sock}=S) -> Res = {bummer, {X, Y, erlang:get_stacktrace()}}, {Res, S} end; -do_send_sync2({append_chunk, CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum, ChunkExtra}, +do_send_sync2({append_chunk, NS, Prefix, Chunk, CSum, Opts}, #state{sock=Sock, sock_id=Index, count=Count}=S) -> try ReqID = <>, CSumT = convert_csum_req(CSum, Chunk), - Req = #mpb_appendchunkreq{coc_namespace=CoC_Namespace, - coc_locator=CoC_Locator, + {ChunkExtra, Pref, FailPref} = machi_pb_translate:conv_from_append_opts(Opts), + Req = #mpb_appendchunkreq{namespace=NS, prefix=Prefix, chunk=Chunk, csum=CSumT, - chunk_extra=ChunkExtra}, + chunk_extra=ChunkExtra, + preferred_file_name=Pref, + flag_fail_preferred=FailPref}, R1a = #mpb_request{req_id=ReqID, do_not_alter=1, append_chunk=Req}, Bin1a = machi_pb:encode_mpb_request(R1a), @@ -436,9 +440,15 @@ do_send_sync2({list_files}, {Res, S#state{count=Count+1}} end. +%% We only convert the checksum types that make sense here: +%% none or client_sha. None of the other types should be sent +%% to us via the PB high protocol. + convert_csum_req(none, Chunk) -> #mpb_chunkcsum{type='CSUM_TAG_CLIENT_SHA', csum=machi_util:checksum_chunk(Chunk)}; +convert_csum_req(<<>>, Chunk) -> + convert_csum_req(none, Chunk); convert_csum_req({client_sha, CSumBin}, _Chunk) -> #mpb_chunkcsum{type='CSUM_TAG_CLIENT_SHA', csum=CSumBin}. diff --git a/src/machi_pb_translate.erl b/src/machi_pb_translate.erl index cc8f728..c615912 100644 --- a/src/machi_pb_translate.erl +++ b/src/machi_pb_translate.erl @@ -34,7 +34,9 @@ -export([from_pb_request/1, from_pb_response/1, to_pb_request/2, - to_pb_response/3 + to_pb_response/3, + conv_from_append_opts/1, + conv_to_append_opts/1 ]). %% TODO: fixme cleanup @@ -50,19 +52,19 @@ from_pb_request(#mpb_ll_request{ {ReqID, {low_auth, undefined, User, Pass}}; from_pb_request(#mpb_ll_request{ req_id=ReqID, - append_chunk=#mpb_ll_appendchunkreq{ + append_chunk=IR=#mpb_ll_appendchunkreq{ + namespace_version=NSVersion, + namespace=NS, + locator=NSLocator, epoch_id=PB_EpochID, - coc_namespace=CoC_Namespace, - coc_locator=CoC_Locator, prefix=Prefix, chunk=Chunk, - csum=#mpb_chunkcsum{type=CSum_type, csum=CSum}, - chunk_extra=ChunkExtra}}) -> + csum=#mpb_chunkcsum{type=CSum_type, csum=CSum}}}) -> EpochID = conv_to_epoch_id(PB_EpochID), CSum_tag = conv_to_csum_tag(CSum_type), - {ReqID, {low_append_chunk, EpochID, CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum_tag, CSum, - ChunkExtra}}; + Opts = conv_to_append_opts(IR), + {ReqID, {low_append_chunk, NSVersion, NS, NSLocator, EpochID, + Prefix, Chunk, CSum_tag, CSum, Opts}}; from_pb_request(#mpb_ll_request{ req_id=ReqID, write_chunk=#mpb_ll_writechunkreq{ @@ -172,15 +174,13 @@ from_pb_request(#mpb_request{req_id=ReqID, {ReqID, {high_auth, User, Pass}}; from_pb_request(#mpb_request{req_id=ReqID, append_chunk=IR=#mpb_appendchunkreq{}}) -> - #mpb_appendchunkreq{coc_namespace=CoC_namespace, - coc_locator=CoC_locator, + #mpb_appendchunkreq{namespace=NS, prefix=Prefix, chunk=Chunk, - csum=CSum, - chunk_extra=ChunkExtra} = IR, + csum=CSum} = IR, TaggedCSum = make_tagged_csum(CSum, Chunk), - {ReqID, {high_append_chunk, CoC_namespace, CoC_locator, Prefix, Chunk, - TaggedCSum, ChunkExtra}}; + Opts = conv_to_append_opts(IR), + {ReqID, {high_append_chunk, NS, Prefix, Chunk, TaggedCSum, Opts}}; from_pb_request(#mpb_request{req_id=ReqID, write_chunk=IR=#mpb_writechunkreq{}}) -> #mpb_writechunkreq{chunk=#mpb_chunk{file_name=File, @@ -391,20 +391,24 @@ to_pb_request(ReqID, {low_echo, _BogusEpochID, Msg}) -> to_pb_request(ReqID, {low_auth, _BogusEpochID, User, Pass}) -> #mpb_ll_request{req_id=ReqID, do_not_alter=2, auth=#mpb_authreq{user=User, password=Pass}}; -to_pb_request(ReqID, {low_append_chunk, EpochID, CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum_tag, CSum, ChunkExtra}) -> +to_pb_request(ReqID, {low_append_chunk, NSVersion, NS, NSLocator, EpochID, + Prefix, Chunk, CSum_tag, CSum, Opts}) -> PB_EpochID = conv_from_epoch_id(EpochID), CSum_type = conv_from_csum_tag(CSum_tag), PB_CSum = #mpb_chunkcsum{type=CSum_type, csum=CSum}, + {ChunkExtra, Pref, FailPref} = conv_from_append_opts(Opts), #mpb_ll_request{req_id=ReqID, do_not_alter=2, append_chunk=#mpb_ll_appendchunkreq{ + namespace_version=NSVersion, + namespace=NS, + locator=NSLocator, epoch_id=PB_EpochID, - coc_namespace=CoC_Namespace, - coc_locator=CoC_Locator, prefix=Prefix, chunk=Chunk, csum=PB_CSum, - chunk_extra=ChunkExtra}}; + chunk_extra=ChunkExtra, + preferred_file_name=Pref, + flag_fail_preferred=FailPref}}; to_pb_request(ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, CSum}) -> PB_EpochID = conv_from_epoch_id(EpochID), CSum_type = conv_from_csum_tag(CSum_tag), @@ -504,7 +508,7 @@ to_pb_response(ReqID, {low_auth, _, _, _}, __TODO_Resp) -> #mpb_ll_response{req_id=ReqID, generic=#mpb_errorresp{code=1, msg="AUTH not implemented"}}; -to_pb_response(ReqID, {low_append_chunk, _EID, _N, _L, _Pfx, _Ch, _CST, _CS, _CE}, Resp)-> +to_pb_response(ReqID, {low_append_chunk, _NSV, _NS, _NSL, _EID, _Pfx, _Ch, _CST, _CS, _O}, Resp)-> case Resp of {ok, {Offset, Size, File}} -> Where = #mpb_chunkpos{offset=Offset, @@ -691,7 +695,7 @@ to_pb_response(ReqID, {high_auth, _User, _Pass}, _Resp) -> #mpb_response{req_id=ReqID, generic=#mpb_errorresp{code=1, msg="AUTH not implemented"}}; -to_pb_response(ReqID, {high_append_chunk, _CoC_n, _CoC_l, _Prefix, _Chunk, _TSum, _CE}, Resp)-> +to_pb_response(ReqID, {high_append_chunk, _NS, _Prefix, _Chunk, _TSum, _O}, Resp)-> case Resp of {ok, {Offset, Size, File}} -> Where = #mpb_chunkpos{offset=Offset, @@ -974,6 +978,27 @@ conv_from_boolean(false) -> conv_from_boolean(true) -> 1. +conv_from_append_opts(#append_opts{chunk_extra=ChunkExtra, + preferred_file_name=Pref, + flag_fail_preferred=FailPref}) -> + {ChunkExtra, Pref, conv_from_boolean(FailPref)}. + + +conv_to_append_opts(#mpb_appendchunkreq{ + chunk_extra=ChunkExtra, + preferred_file_name=Pref, + flag_fail_preferred=FailPref}) -> + #append_opts{chunk_extra=ChunkExtra, + preferred_file_name=Pref, + flag_fail_preferred=conv_to_boolean(FailPref)}; +conv_to_append_opts(#mpb_ll_appendchunkreq{ + chunk_extra=ChunkExtra, + preferred_file_name=Pref, + flag_fail_preferred=FailPref}) -> + #append_opts{chunk_extra=ChunkExtra, + preferred_file_name=Pref, + flag_fail_preferred=conv_to_boolean(FailPref)}. + conv_from_projection_v1(#projection_v1{epoch_number=Epoch, epoch_csum=CSum, author_server=Author, diff --git a/src/machi_proxy_flu1_client.erl b/src/machi_proxy_flu1_client.erl index e4bc0d2..947a307 100644 --- a/src/machi_proxy_flu1_client.erl +++ b/src/machi_proxy_flu1_client.erl @@ -57,10 +57,7 @@ %% FLU1 API -export([ %% File API - append_chunk/4, append_chunk/5, - append_chunk/6, append_chunk/7, - append_chunk_extra/5, append_chunk_extra/6, - append_chunk_extra/7, append_chunk_extra/8, + append_chunk/6, append_chunk/8, read_chunk/6, read_chunk/7, checksum_list/3, checksum_list/4, list_files/2, list_files/3, @@ -106,58 +103,17 @@ start_link(#p_srvr{}=I) -> %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix'. -append_chunk(PidSpec, EpochID, Prefix, Chunk) -> - append_chunk(PidSpec, EpochID, Prefix, Chunk, infinity). +append_chunk(PidSpec, NSInfo, EpochID, Prefix, Chunk, CSum) -> + append_chunk(PidSpec, NSInfo, EpochID, Prefix, Chunk, CSum, + #append_opts{}, infinity). %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix'. -append_chunk(PidSpec, EpochID, Prefix, Chunk, Timeout) -> - append_chunk_extra(PidSpec, EpochID, - ?DEFAULT_COC_NAMESPACE, ?DEFAULT_COC_LOCATOR, - Prefix, Chunk, 0, Timeout). - -%% @doc Append a chunk (binary- or iolist-style) of data to a file -%% with `Prefix'. - -append_chunk(PidSpec, EpochID, CoC_Namespace, CoC_Locator, Prefix, Chunk) -> - append_chunk(PidSpec, EpochID, CoC_Namespace, CoC_Locator, Prefix, Chunk, infinity). - -%% @doc Append a chunk (binary- or iolist-style) of data to a file -%% with `Prefix'. - -append_chunk(PidSpec, EpochID, CoC_Namespace, CoC_Locator, Prefix, Chunk, Timeout) -> - append_chunk_extra(PidSpec, EpochID, - CoC_Namespace, CoC_Locator, - Prefix, Chunk, 0, Timeout). - -%% @doc Append a chunk (binary- or iolist-style) of data to a file -%% with `Prefix'. - -append_chunk_extra(PidSpec, EpochID, Prefix, Chunk, ChunkExtra) - when is_integer(ChunkExtra), ChunkExtra >= 0 -> - append_chunk_extra(PidSpec, EpochID, - ?DEFAULT_COC_NAMESPACE, ?DEFAULT_COC_LOCATOR, - Prefix, Chunk, ChunkExtra, infinity). - -%% @doc Append a chunk (binary- or iolist-style) of data to a file -%% with `Prefix'. - -append_chunk_extra(PidSpec, EpochID, Prefix, Chunk, ChunkExtra, Timeout) -> - append_chunk_extra(PidSpec, EpochID, - ?DEFAULT_COC_NAMESPACE, ?DEFAULT_COC_LOCATOR, - Prefix, Chunk, ChunkExtra, Timeout). - -append_chunk_extra(PidSpec, EpochID, CoC_Namespace, CoC_Locator, - Prefix, Chunk, ChunkExtra) -> - append_chunk_extra(PidSpec, EpochID, CoC_Namespace, CoC_Locator, - Prefix, Chunk, ChunkExtra, infinity). - -append_chunk_extra(PidSpec, EpochID, CoC_Namespace, CoC_Locator, - Prefix, Chunk, ChunkExtra, Timeout) -> - gen_server:call(PidSpec, {req, {append_chunk_extra, EpochID, - CoC_Namespace, CoC_Locator, - Prefix, Chunk, ChunkExtra}}, +append_chunk(PidSpec, NSInfo, EpochID, Prefix, Chunk, CSum, Opts, + Timeout) -> + gen_server:call(PidSpec, {req, {append_chunk, NSInfo, EpochID, + Prefix, Chunk, CSum, Opts, Timeout}}, Timeout). %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. @@ -415,11 +371,11 @@ do_req_retry(_Req, 2, Err, S) -> do_req_retry(Req, Depth, _Err, S) -> do_req(Req, Depth + 1, try_connect(disconnect(S))). -make_req_fun({append_chunk_extra, EpochID, CoC_Namespace, CoC_Locator, - Prefix, Chunk, ChunkExtra}, +make_req_fun({append_chunk, NSInfo, EpochID, + Prefix, Chunk, CSum, Opts, Timeout}, #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> - fun() -> Mod:append_chunk_extra(Sock, EpochID, CoC_Namespace, CoC_Locator, - Prefix, Chunk, ChunkExtra) + fun() -> Mod:append_chunk(Sock, NSInfo, EpochID, + Prefix, Chunk, CSum, Opts, Timeout) end; make_req_fun({read_chunk, EpochID, File, Offset, Size, Opts}, #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> diff --git a/src/machi_util.erl b/src/machi_util.erl index aa5f070..bc885aa 100644 --- a/src/machi_util.erl +++ b/src/machi_util.erl @@ -49,7 +49,8 @@ %% Other wait_for_death/2, wait_for_life/2, bool2int/1, - int2bool/1 + int2bool/1, + ns_info_default/1 ]). -include("machi.hrl"). @@ -68,12 +69,12 @@ make_regname(Prefix) when is_list(Prefix) -> %% @doc Calculate a config file path, by common convention. --spec make_config_filename(string(), machi_dt:coc_namespace(), machi_dt:coc_locator(), string()) -> +-spec make_config_filename(string(), machi_dt:namespace(), machi_dt:locator(), string()) -> string(). -make_config_filename(DataDir, CoC_Namespace, CoC_Locator, Prefix) -> - Locator_str = int_to_hexstr(CoC_Locator, 32), +make_config_filename(DataDir, NS, Locator, Prefix) -> + Locator_str = int_to_hexstr(Locator, 32), lists:flatten(io_lib:format("~s/config/~s^~s^~s", - [DataDir, Prefix, CoC_Namespace, Locator_str])). + [DataDir, Prefix, NS, Locator_str])). %% @doc Calculate a config file path, by common convention. @@ -102,19 +103,19 @@ make_checksum_filename(DataDir, FileName) -> %% @doc Calculate a file data file path, by common convention. --spec make_data_filename(string(), machi_dt:coc_namespace(), machi_dt:coc_locator(), string(), atom()|string()|binary(), integer()|string()) -> +-spec make_data_filename(string(), machi_dt:namespace(), machi_dt:locator(), string(), atom()|string()|binary(), integer()|string()) -> {binary(), string()}. -make_data_filename(DataDir, CoC_Namespace, CoC_Locator, Prefix, SequencerName, FileNum) +make_data_filename(DataDir, NS, Locator, Prefix, SequencerName, FileNum) when is_integer(FileNum) -> - Locator_str = int_to_hexstr(CoC_Locator, 32), + Locator_str = int_to_hexstr(Locator, 32), File = erlang:iolist_to_binary(io_lib:format("~s^~s^~s^~s^~w", - [Prefix, CoC_Namespace, Locator_str, SequencerName, FileNum])), + [Prefix, NS, Locator_str, SequencerName, FileNum])), make_data_filename2(DataDir, File); -make_data_filename(DataDir, CoC_Namespace, CoC_Locator, Prefix, SequencerName, String) +make_data_filename(DataDir, NS, Locator, Prefix, SequencerName, String) when is_list(String) -> - Locator_str = int_to_hexstr(CoC_Locator, 32), + Locator_str = int_to_hexstr(Locator, 32), File = erlang:iolist_to_binary(io_lib:format("~s^~s^~s^~s^~s", - [Prefix, CoC_Namespace, Locator_str, SequencerName, string])), + [Prefix, NS, Locator_str, SequencerName, string])), make_data_filename2(DataDir, File). make_data_filename2(DataDir, File) -> @@ -161,7 +162,7 @@ is_valid_filename(Filename) -> %% %% %% Invalid filenames will return an empty list. --spec parse_filename( Filename :: string() ) -> {} | {string(), machi_dt:coc_namespace(), machi_dt:coc_locator(), string(), string() }. +-spec parse_filename( Filename :: string() ) -> {} | {string(), machi_dt:namespace(), machi_dt:locator(), string(), string() }. parse_filename(Filename) -> case string:tokens(Filename, "^") of [Prefix, CoC_NS, CoC_Loc, UUID, SeqNo] -> @@ -181,10 +182,10 @@ parse_filename(Filename) -> %% @doc Read the file size of a config file, which is used as the %% basis for a minimum sequence number. --spec read_max_filenum(string(), machi_dt:coc_namespace(), machi_dt:coc_locator(), string()) -> +-spec read_max_filenum(string(), machi_dt:namespace(), machi_dt:locator(), string()) -> non_neg_integer(). -read_max_filenum(DataDir, CoC_Namespace, CoC_Locator, Prefix) -> - case file:read_file_info(make_config_filename(DataDir, CoC_Namespace, CoC_Locator, Prefix)) of +read_max_filenum(DataDir, NS, Locator, Prefix) -> + case file:read_file_info(make_config_filename(DataDir, NS, Locator, Prefix)) of {error, enoent} -> 0; {ok, FI} -> @@ -194,11 +195,11 @@ read_max_filenum(DataDir, CoC_Namespace, CoC_Locator, Prefix) -> %% @doc Increase the file size of a config file, which is used as the %% basis for a minimum sequence number. --spec increment_max_filenum(string(), machi_dt:coc_namespace(), machi_dt:coc_locator(), string()) -> +-spec increment_max_filenum(string(), machi_dt:namespace(), machi_dt:locator(), string()) -> ok | {error, term()}. -increment_max_filenum(DataDir, CoC_Namespace, CoC_Locator, Prefix) -> +increment_max_filenum(DataDir, NS, Locator, Prefix) -> try - {ok, FH} = file:open(make_config_filename(DataDir, CoC_Namespace, CoC_Locator, Prefix), [append]), + {ok, FH} = file:open(make_config_filename(DataDir, NS, Locator, Prefix), [append]), ok = file:write(FH, "x"), ok = file:sync(FH), ok = file:close(FH) @@ -287,12 +288,25 @@ int_to_hexbin(I, I_size) -> checksum_chunk(Chunk) when is_binary(Chunk); is_list(Chunk) -> crypto:hash(sha, Chunk). +convert_csum_tag(A) when is_atom(A)-> + A; +convert_csum_tag(?CSUM_TAG_NONE) -> + ?CSUM_TAG_NONE_ATOM; +convert_csum_tag(?CSUM_TAG_CLIENT_SHA) -> + ?CSUM_TAG_CLIENT_SHA_ATOM; +convert_csum_tag(?CSUM_TAG_SERVER_SHA) -> + ?CSUM_TAG_SERVER_SHA_ATOM; +convert_csum_tag(?CSUM_TAG_SERVER_REGEN_SHA) -> + ?CSUM_TAG_SERVER_REGEN_SHA_ATOM. + %% @doc Create a tagged checksum make_tagged_csum(none) -> <>; +make_tagged_csum(<<>>) -> + <>; make_tagged_csum({Tag, CSum}) -> - make_tagged_csum(Tag, CSum). + make_tagged_csum(convert_csum_tag(Tag), CSum). %% @doc Makes tagged csum. Each meanings are: %% none / ?CSUM_TAG_NONE @@ -431,3 +445,10 @@ bool2int(true) -> 1; bool2int(false) -> 0. int2bool(0) -> false; int2bool(I) when is_integer(I) -> true. + +ns_info_default(#ns_info{}=NSInfo) -> + NSInfo; +ns_info_default(undefined) -> + #ns_info{}. + + diff --git a/src/machi_yessir_client.erl b/src/machi_yessir_client.erl index 1bdef2a..b26298a 100644 --- a/src/machi_yessir_client.erl +++ b/src/machi_yessir_client.erl @@ -22,6 +22,8 @@ -module(machi_yessir_client). +-ifdef(TODO_refactoring_deferred). + -include("machi.hrl"). -include("machi_projection.hrl"). @@ -509,3 +511,5 @@ disconnect(#yessir{name=Name}) -> %% =INFO REPORT==== 17-May-2015::18:57:52 === %% Repair success: tail a of [a] finished ap_mode repair ID {a,{1431,856671,140404}}: ok %% Stats [{t_in_files,0},{t_in_chunks,10413},{t_in_bytes,682426368},{t_out_files,0},{t_out_chunks,10413},{t_out_bytes,682426368},{t_bad_chunks,0},{t_elapsed_seconds,1.591}] + +-endif. % TODO_refactoring_deferred diff --git a/test/machi_admin_util_test.erl b/test/machi_admin_util_test.erl index 1ebbbf3..cd4d813 100644 --- a/test/machi_admin_util_test.erl +++ b/test/machi_admin_util_test.erl @@ -44,6 +44,8 @@ verify_file_checksums_test2() -> TcpPort = 32958, DataDir = "./data", W_props = [{initial_wedged, false}], + NSInfo = undefined, + NoCSum = <<>>, try machi_test_util:start_flu_package(verify1_flu, TcpPort, DataDir, W_props), @@ -51,8 +53,8 @@ verify_file_checksums_test2() -> try Prefix = <<"verify_prefix">>, NumChunks = 10, - [{ok, _} = ?FLU_C:append_chunk(Sock1, ?DUMMY_PV1_EPOCH, - Prefix, <>) || + [{ok, _} = ?FLU_C:append_chunk(Sock1, NSInfo, ?DUMMY_PV1_EPOCH, + Prefix, <>, NoCSum) || X <- lists:seq(1, NumChunks)], {ok, [{_FileSize,File}]} = ?FLU_C:list_files(Sock1, ?DUMMY_PV1_EPOCH), ?assertEqual({ok, []}, diff --git a/test/machi_ap_repair_eqc.erl b/test/machi_ap_repair_eqc.erl index 7d87d35..9c70474 100644 --- a/test/machi_ap_repair_eqc.erl +++ b/test/machi_ap_repair_eqc.erl @@ -118,7 +118,10 @@ append(CRIndex, Bin, #state{verbose=V}=S) -> {_SimSelfName, C} = lists:nth(CRIndex, CRList), Prefix = <<"pre">>, Len = byte_size(Bin), - Res = (catch machi_cr_client:append_chunk(C, Prefix, Bin, {sec(1), sec(1)})), + NSInfo = #ns_info{}, + NoCSum = <<>>, + Opts1 = #append_opts{}, + Res = (catch machi_cr_client:append_chunk(C, NSInfo, Prefix, Bin, NoCSum, Opts1, sec(1))), case Res of {ok, {_Off, Len, _FileName}=Key} -> case ets:insert_new(?WRITTEN_TAB, {Key, Bin}) of @@ -427,7 +430,7 @@ confirm_result(_T) -> 0 -> ok; _ -> DumpFailed = filename:join(DirBase, "dump-failed-" ++ Suffix), - ?V("Dump failed ETS tab to: ~w~n", [DumpFailed]), + ?V("Dump failed ETS tab to: ~s~n", [DumpFailed]), ets:tab2file(?FAILED_TAB, DumpFailed) end, case Critical of diff --git a/test/machi_cr_client_test.erl b/test/machi_cr_client_test.erl index 5179fc8..e4f1171 100644 --- a/test/machi_cr_client_test.erl +++ b/test/machi_cr_client_test.erl @@ -107,6 +107,8 @@ smoke_test2() -> try Prefix = <<"pre">>, Chunk1 = <<"yochunk">>, + NSInfo = undefined, + NoCSum = <<>>, Host = "localhost", PortBase = 64454, Os = [{ignore_stability_time, true}, {active_mode, false}], @@ -114,12 +116,12 @@ smoke_test2() -> %% Whew ... ok, now start some damn tests. {ok, C1} = machi_cr_client:start_link([P || {_,P}<-orddict:to_list(D)]), - machi_cr_client:append_chunk(C1, Prefix, Chunk1), + machi_cr_client:append_chunk(C1, NSInfo, Prefix, Chunk1, NoCSum), {ok, {Off1,Size1,File1}} = - machi_cr_client:append_chunk(C1, Prefix, Chunk1), - Chunk1_badcs = {<>, Chunk1}, + machi_cr_client:append_chunk(C1, NSInfo, Prefix, Chunk1, NoCSum), + BadCSum = {?CSUM_TAG_CLIENT_SHA, crypto:sha("foo")}, {error, bad_checksum} = - machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs), + machi_cr_client:append_chunk(C1, NSInfo, Prefix, Chunk1, BadCSum), {ok, {[{_, Off1, Chunk1, _}], []}} = machi_cr_client:read_chunk(C1, File1, Off1, Size1, []), {ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0, @@ -173,18 +175,19 @@ smoke_test2() -> true = is_binary(KludgeBin), {error, bad_arg} = machi_cr_client:checksum_list(C1, <<"!!!!">>), -io:format(user, "\nFiles = ~p\n", [machi_cr_client:list_files(C1)]), + io:format(user, "\nFiles = ~p\n", [machi_cr_client:list_files(C1)]), %% Exactly one file right now, e.g., %% {ok,[{2098202,<<"pre^b144ef13-db4d-4c9f-96e7-caff02dc754f^1">>}]} {ok, [_]} = machi_cr_client:list_files(C1), - %% Go back and test append_chunk_extra() and write_chunk() + %% Go back and test append_chunk() + extra and write_chunk() Chunk10 = <<"It's a different chunk!">>, Size10 = byte_size(Chunk10), Extra10 = 5, + Opts1 = #append_opts{chunk_extra=Extra10*Size10}, {ok, {Off10,Size10,File10}} = - machi_cr_client:append_chunk_extra(C1, Prefix, Chunk10, - Extra10 * Size10), + machi_cr_client:append_chunk(C1, NSInfo, Prefix, Chunk10, + NoCSum, Opts1), {ok, {[{_, Off10, Chunk10, _}], []}} = machi_cr_client:read_chunk(C1, File10, Off10, Size10, []), [begin @@ -198,7 +201,7 @@ io:format(user, "\nFiles = ~p\n", [machi_cr_client:list_files(C1)]), machi_cr_client:read_chunk(C1, File10, Offx, Size10, []) end || Seq <- lists:seq(1, Extra10)], {ok, {Off11,Size11,File11}} = - machi_cr_client:append_chunk(C1, Prefix, Chunk10), + machi_cr_client:append_chunk(C1, NSInfo, Prefix, Chunk10, NoCSum), %% %% Double-check that our reserved extra bytes were really honored! %% true = (Off11 > (Off10 + (Extra10 * Size10))), io:format(user, "\nFiles = ~p\n", [machi_cr_client:list_files(C1)]), @@ -224,6 +227,8 @@ witness_smoke_test2() -> try Prefix = <<"pre">>, Chunk1 = <<"yochunk">>, + NSInfo = undefined, + NoCSum = <<>>, Host = "localhost", PortBase = 64444, Os = [{ignore_stability_time, true}, {active_mode, false}, @@ -233,12 +238,13 @@ witness_smoke_test2() -> %% Whew ... ok, now start some damn tests. {ok, C1} = machi_cr_client:start_link([P || {_,P}<-orddict:to_list(D)]), - {ok, _} = machi_cr_client:append_chunk(C1, Prefix, Chunk1), + {ok, _} = machi_cr_client:append_chunk(C1, NSInfo, Prefix, + Chunk1, NoCSum), {ok, {Off1,Size1,File1}} = - machi_cr_client:append_chunk(C1, Prefix, Chunk1), - Chunk1_badcs = {<>, Chunk1}, + machi_cr_client:append_chunk(C1, NSInfo, Prefix, Chunk1, NoCSum), + BadCSum = {?CSUM_TAG_CLIENT_SHA, crypto:sha("foo")}, {error, bad_checksum} = - machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs), + machi_cr_client:append_chunk(C1, NSInfo, Prefix, Chunk1, BadCSum), {ok, {[{_, Off1, Chunk1, _}], []}} = machi_cr_client:read_chunk(C1, File1, Off1, Size1, []), @@ -270,7 +276,8 @@ witness_smoke_test2() -> machi_cr_client:read_chunk(C1, File1, Off1, Size1, []), %% But because the head is wedged, an append will fail. {error, partition} = - machi_cr_client:append_chunk(C1, Prefix, Chunk1, 1*1000), + machi_cr_client:append_chunk(C1, NSInfo, Prefix, Chunk1, NoCSum, + #append_opts{}, 1*1000), %% The witness's wedge status should cause timeout/partition %% for write_chunk also. diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index a1d098a..03097e4 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -91,6 +91,8 @@ flu_smoke_test() -> Host = "localhost", TcpPort = 12957, DataDir = "./data", + NSInfo = undefined, + NoCSum = <<>>, Prefix = <<"prefix!">>, BadPrefix = BadFile = "no/good", W_props = [{initial_wedged, false}], @@ -108,17 +110,17 @@ flu_smoke_test() -> {ok, {false, _}} = ?FLU_C:wedge_status(Host, TcpPort), Chunk1 = <<"yo!">>, - {ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort, + {ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort, NSInfo, ?DUMMY_PV1_EPOCH, - Prefix, Chunk1), + Prefix, Chunk1, NoCSum), {ok, {[{_, Off1, Chunk1, _}], _}} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, File1, Off1, Len1, []), {ok, KludgeBin} = ?FLU_C:checksum_list(Host, TcpPort, ?DUMMY_PV1_EPOCH, File1), true = is_binary(KludgeBin), - {error, bad_arg} = ?FLU_C:append_chunk(Host, TcpPort, + {error, bad_arg} = ?FLU_C:append_chunk(Host, TcpPort, NSInfo, ?DUMMY_PV1_EPOCH, - BadPrefix, Chunk1), + BadPrefix, Chunk1, NoCSum), {ok, [{_,File1}]} = ?FLU_C:list_files(Host, TcpPort, ?DUMMY_PV1_EPOCH), Len1 = size(Chunk1), {error, not_written} = ?FLU_C:read_chunk(Host, TcpPort, @@ -135,16 +137,19 @@ flu_smoke_test() -> %% ?DUMMY_PV1_EPOCH, %% File1, Off1, Len1*9999), - {ok, {Off1b,Len1b,File1b}} = ?FLU_C:append_chunk(Host, TcpPort, + {ok, {Off1b,Len1b,File1b}} = ?FLU_C:append_chunk(Host, TcpPort, NSInfo, ?DUMMY_PV1_EPOCH, - Prefix, Chunk1), + Prefix, Chunk1,NoCSum), Extra = 42, - {ok, {Off1c,Len1c,File1c}} = ?FLU_C:append_chunk_extra(Host, TcpPort, + Opts1 = #append_opts{chunk_extra=Extra}, + {ok, {Off1c,Len1c,File1c}} = ?FLU_C:append_chunk(Host, TcpPort, NSInfo, ?DUMMY_PV1_EPOCH, - Prefix, Chunk1, Extra), + Prefix, Chunk1, NoCSum, + Opts1, infinity), {ok, {Off1d,Len1d,File1d}} = ?FLU_C:append_chunk(Host, TcpPort, + NSInfo, ?DUMMY_PV1_EPOCH, - Prefix, Chunk1), + Prefix, Chunk1,NoCSum), if File1b == File1c, File1c == File1d -> true = (Off1c == Off1b + Len1b), true = (Off1d == Off1c + Len1c + Extra); @@ -152,11 +157,6 @@ flu_smoke_test() -> exit(not_mandatory_but_test_expected_same_file_fixme) end, - Chunk1_cs = {<>, Chunk1}, - {ok, {Off1e,Len1e,File1e}} = ?FLU_C:append_chunk(Host, TcpPort, - ?DUMMY_PV1_EPOCH, - Prefix, Chunk1_cs), - Chunk2 = <<"yo yo">>, Len2 = byte_size(Chunk2), Off2 = ?MINIMUM_OFFSET + 77, @@ -238,13 +238,15 @@ bad_checksum_test() -> DataDir = "./data.bct", Opts = [{initial_wedged, false}], {_,_,_} = machi_test_util:start_flu_package(projection_test_flu, TcpPort, DataDir, Opts), + NSInfo = undefined, try Prefix = <<"some prefix">>, Chunk1 = <<"yo yo yo">>, - Chunk1_badcs = {<>, Chunk1}, - {error, bad_checksum} = ?FLU_C:append_chunk(Host, TcpPort, + BadCSum = {?CSUM_TAG_CLIENT_SHA, crypto:sha("foo")}, + {error, bad_checksum} = ?FLU_C:append_chunk(Host, TcpPort, NSInfo, ?DUMMY_PV1_EPOCH, - Prefix, Chunk1_badcs), + Prefix, + Chunk1, BadCSum), ok after machi_test_util:stop_flu_package() @@ -256,6 +258,8 @@ witness_test() -> DataDir = "./data.witness", Opts = [{initial_wedged, false}, {witness_mode, true}], {_,_,_} = machi_test_util:start_flu_package(projection_test_flu, TcpPort, DataDir, Opts), + NSInfo = undefined, + NoCSum = <<>>, try Prefix = <<"some prefix">>, Chunk1 = <<"yo yo yo">>, @@ -268,8 +272,8 @@ witness_test() -> {ok, EpochID1} = ?FLU_C:get_latest_epochid(Host, TcpPort, private), %% Witness-protected ops all fail - {error, bad_arg} = ?FLU_C:append_chunk(Host, TcpPort, EpochID1, - Prefix, Chunk1), + {error, bad_arg} = ?FLU_C:append_chunk(Host, TcpPort, NSInfo, EpochID1, + Prefix, Chunk1, NoCSum), File = <<"foofile">>, {error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort, EpochID1, File, 9999, 9999, []), diff --git a/test/machi_flu_psup_test.erl b/test/machi_flu_psup_test.erl index 378ff74..14d4640 100644 --- a/test/machi_flu_psup_test.erl +++ b/test/machi_flu_psup_test.erl @@ -85,9 +85,12 @@ partial_stop_restart2() -> machi_flu1_client:wedge_status(Addr, TcpPort) end, Append = fun({_,#p_srvr{address=Addr, port=TcpPort}}, EpochID) -> + NSInfo = undefined, + NoCSum = <<>>, machi_flu1_client:append_chunk(Addr, TcpPort, - EpochID, - <<"prefix">>, <<"data">>) + NSInfo, EpochID, + <<"prefix">>, + <<"data">>, NoCSum) end, try [Start(P) || P <- Ps], diff --git a/test/machi_pb_high_client_test.erl b/test/machi_pb_high_client_test.erl index 16b125c..2371076 100644 --- a/test/machi_pb_high_client_test.erl +++ b/test/machi_pb_high_client_test.erl @@ -24,6 +24,7 @@ -ifdef(TEST). -ifndef(PULSE). +-include("machi.hrl"). -include("machi_pb.hrl"). -include("machi_projection.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -59,13 +60,16 @@ smoke_test2() -> CoC_l = 0, % CoC_locator (not implemented) Prefix = <<"prefix">>, Chunk1 = <<"Hello, chunk!">>, + NS = "", + NoCSum = <<>>, + Opts1 = #append_opts{}, {ok, {Off1, Size1, File1}} = - ?C:append_chunk(Clnt, CoC_n, CoC_l, Prefix, Chunk1, none, 0), + ?C:append_chunk(Clnt, NS, Prefix, Chunk1, NoCSum, Opts1), true = is_binary(File1), Chunk2 = "It's another chunk", CSum2 = {client_sha, machi_util:checksum_chunk(Chunk2)}, {ok, {Off2, Size2, File2}} = - ?C:append_chunk(Clnt, CoC_n, CoC_l, Prefix, Chunk2, CSum2, 1024), + ?C:append_chunk(Clnt, NS, Prefix, Chunk2, CSum2, Opts1), Chunk3 = ["This is a ", <<"test,">>, 32, [["Hello, world!"]]], File3 = File2, Off3 = Off2 + iolist_size(Chunk2), @@ -110,8 +114,8 @@ smoke_test2() -> LargeBytes = binary:copy(<<"x">>, 1024*1024), LBCsum = {client_sha, machi_util:checksum_chunk(LargeBytes)}, {ok, {Offx, Sizex, Filex}} = - ?C:append_chunk(Clnt, CoC_n, CoC_l, - Prefix, LargeBytes, LBCsum, 0), + ?C:append_chunk(Clnt, NS, + Prefix, LargeBytes, LBCsum, Opts1), ok = ?C:trim_chunk(Clnt, Filex, Offx, Sizex), %% Make sure everything was trimmed diff --git a/test/machi_proxy_flu1_client_test.erl b/test/machi_proxy_flu1_client_test.erl index b8556b7..c280072 100644 --- a/test/machi_proxy_flu1_client_test.erl +++ b/test/machi_proxy_flu1_client_test.erl @@ -36,6 +36,8 @@ api_smoke_test() -> DataDir = "./data.api_smoke_flu", W_props = [{active_mode, false},{initial_wedged, false}], Prefix = <<"prefix">>, + NSInfo = undefined, + NoCSum = <<>>, try {[I], _, _} = machi_test_util:start_flu_package( @@ -43,35 +45,40 @@ api_smoke_test() -> {ok, Prox1} = ?MUT:start_link(I), try FakeEpoch = ?DUMMY_PV1_EPOCH, - [{ok, {_,_,_}} = ?MUT:append_chunk(Prox1, - FakeEpoch, Prefix, <<"data">>, - infinity) || _ <- lists:seq(1,5)], + [{ok, {_,_,_}} = ?MUT:append_chunk( + Prox1, NSInfo, FakeEpoch, + Prefix, <<"data">>, NoCSum) || + _ <- lists:seq(1,5)], %% Stop the FLU, what happens? machi_test_util:stop_flu_package(), - [{error,partition} = ?MUT:append_chunk(Prox1, + [{error,partition} = ?MUT:append_chunk(Prox1, NSInfo, FakeEpoch, Prefix, <<"data-stopped1">>, - infinity) || _ <- lists:seq(1,3)], + NoCSum) || _ <- lists:seq(1,3)], %% Start the FLU again, we should be able to do stuff immediately machi_test_util:start_flu_package(RegName, TcpPort, DataDir, [no_cleanup|W_props]), MyChunk = <<"my chunk data">>, {ok, {MyOff,MySize,MyFile}} = - ?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk, - infinity), + ?MUT:append_chunk(Prox1, NSInfo, FakeEpoch, Prefix, MyChunk, + NoCSum), {ok, {[{_, MyOff, MyChunk, _}], []}} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize, []), MyChunk2 = <<"my chunk data, yeah, again">>, + Opts1 = #append_opts{chunk_extra=4242}, {ok, {MyOff2,MySize2,MyFile2}} = - ?MUT:append_chunk_extra(Prox1, FakeEpoch, Prefix, - MyChunk2, 4242, infinity), + ?MUT:append_chunk(Prox1, NSInfo, FakeEpoch, Prefix, + MyChunk2, NoCSum, Opts1, infinity), {ok, {[{_, MyOff2, MyChunk2, _}], []}} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2, []), - MyChunk_badcs = {<>, MyChunk}, - {error, bad_checksum} = ?MUT:append_chunk(Prox1, FakeEpoch, - Prefix, MyChunk_badcs), - {error, bad_checksum} = ?MUT:write_chunk(Prox1, FakeEpoch, - <<"foo-file^^0^1^1">>, 99832, - MyChunk_badcs), + BadCSum = {?CSUM_TAG_CLIENT_SHA, crypto:sha("foo")}, + {error, bad_checksum} = ?MUT:append_chunk(Prox1, NSInfo, FakeEpoch, + Prefix, MyChunk, BadCSum), + Opts2 = #append_opts{chunk_extra=99832}, +io:format(user, "\nTODO: fix write_chunk() call below @ ~s LINE ~w\n", [?MODULE,?LINE]), + %% {error, bad_checksum} = ?MUT:write_chunk(Prox1, NSInfo, FakeEpoch, + %% <<"foo-file^^0^1^1">>, + %% MyChunk, BadCSum, + %% Opts2, infinity), %% Put kick_projection_reaction() in the middle of the test so %% that any problems with its async nature will (hopefully) @@ -111,6 +118,8 @@ flu_restart_test2() -> TcpPort = 17125, DataDir = "./data.api_smoke_flu2", W_props = [{initial_wedged, false}, {active_mode, false}], + NSInfo = undefined, + NoCSum = <<>>, try {[I], _, _} = machi_test_util:start_flu_package( @@ -120,9 +129,8 @@ flu_restart_test2() -> FakeEpoch = ?DUMMY_PV1_EPOCH, Data = <<"data!">>, Dataxx = <<"Fake!">>, - {ok, {Off1,Size1,File1}} = ?MUT:append_chunk(Prox1, - FakeEpoch, <<"prefix">>, Data, - infinity), + {ok, {Off1,Size1,File1}} = ?MUT:append_chunk(Prox1, NSInfo, + FakeEpoch, <<"prefix">>, Data, NoCSum), P_a = #p_srvr{name=a, address="localhost", port=6622}, P1 = machi_projection:new(1, RegName, [P_a], [], [RegName], [], []), P1xx = P1#projection_v1{dbg2=["dbg2 changes are ok"]}, @@ -146,6 +154,7 @@ flu_restart_test2() -> %% makes the code a bit convoluted. (No LFE or %% Elixir macros here, alas, they'd be useful.) + AppendOpts1 = #append_opts{chunk_extra=42}, ExpectedOps = [ fun(run) -> ?assertEqual({ok, EpochID}, ?MUT:get_epoch_id(Prox1)), @@ -227,20 +236,22 @@ flu_restart_test2() -> (stop) -> ?MUT:get_all_projections(Prox1, private) end, fun(run) -> {ok, {_,_,_}} = - ?MUT:append_chunk(Prox1, FakeEpoch, - <<"prefix">>, Data, infinity), + ?MUT:append_chunk(Prox1, NSInfo, FakeEpoch, + <<"prefix">>, Data, NoCSum), ok; (line) -> io:format("line ~p, ", [?LINE]); - (stop) -> ?MUT:append_chunk(Prox1, FakeEpoch, - <<"prefix">>, Data, infinity) + (stop) -> ?MUT:append_chunk(Prox1, NSInfo, FakeEpoch, + <<"prefix">>, Data, NoCSum) end, fun(run) -> {ok, {_,_,_}} = - ?MUT:append_chunk_extra(Prox1, FakeEpoch, - <<"prefix">>, Data, 42, infinity), + ?MUT:append_chunk(Prox1, NSInfo, FakeEpoch, + <<"prefix">>, Data, NoCSum, + AppendOpts1, infinity), ok; (line) -> io:format("line ~p, ", [?LINE]); - (stop) -> ?MUT:append_chunk_extra(Prox1, FakeEpoch, - <<"prefix">>, Data, 42, infinity) + (stop) -> ?MUT:append_chunk(Prox1, NSInfo, FakeEpoch, + <<"prefix">>, Data, NoCSum, + AppendOpts1, infinity) end, fun(run) -> {ok, {[{_, Off1, Data, _}], []}} = ?MUT:read_chunk(Prox1, FakeEpoch, diff --git a/test/machi_test_util.erl b/test/machi_test_util.erl index ff908b7..70b02af 100644 --- a/test/machi_test_util.erl +++ b/test/machi_test_util.erl @@ -83,7 +83,7 @@ stop_machi_sup() -> undefined -> ok; Pid -> catch exit(whereis(machi_sup), normal), - machi_util:wait_for_death(Pid, 30) + machi_util:wait_for_death(Pid, 100) end. clean_up(FluInfo) ->