diff --git a/.gitignore b/.gitignore index 80da416..e10eb87 100644 --- a/.gitignore +++ b/.gitignore @@ -1,20 +1,14 @@ prototype/chain-manager/patch.* +.dialyzer-last-run.txt .eqc-info .eunit deps +*.plt erl_crash.dump .concrete/DEV_MODE .rebar edoc -# Dialyzer stuff -.dialyzer-last-run.txt -.ebin.native -.local_dialyzer_plt -dialyzer_unhandled_warnings -dialyzer_warnings -*.plt - # PB artifacts for Erlang include/machi_pb.hrl diff --git a/Makefile b/Makefile index a93a383..9db0122 100644 --- a/Makefile +++ b/Makefile @@ -34,6 +34,14 @@ deps: clean: $(REBAR) -r clean +test: deps compile eunit + +eunit: + $(REBAR) -v skip_deps=true eunit + +edoc: edoc-clean + $(REBAR) skip_deps=true doc + edoc-clean: rm -f edoc/*.png edoc/*.html edoc/*.css edoc/edoc-info @@ -42,6 +50,33 @@ pulse: compile #env USE_PULSE=1 $(REBAR) skip_deps=true clean compile #env USE_PULSE=1 $(REBAR) skip_deps=true -D PULSE eunit -v +APPS = kernel stdlib sasl erts ssl compiler eunit crypto +PLT = $(HOME)/.machi_dialyzer_plt + +build_plt: deps compile + dialyzer --build_plt --output_plt $(PLT) --apps $(APPS) deps/*/ebin + +DIALYZER_DEP_APPS = ebin/machi_pb.beam \ + deps/cluster_info/ebin \ + deps/protobuffs/ebin \ + deps/riak_dt/ebin +### DIALYZER_FLAGS = -Wno_return -Wrace_conditions -Wunderspecs +DIALYZER_FLAGS = -Wno_return -Wrace_conditions + +dialyzer: deps compile + dialyzer $(DIALYZER_FLAGS) --plt $(PLT) ebin $(DIALYZER_DEP_APPS) | \ + tee ./.dialyzer-last-run.txt | \ + egrep -v -f ./filter-dialyzer-dep-warnings + +dialyzer-test: deps compile + echo Force rebar to recompile .eunit dir w/o running tests > /dev/null + rebar skip_deps=true eunit suite=lamport_clock + dialyzer $(DIALYZER_FLAGS) --plt $(PLT) .eunit $(DIALYZER_DEP_APPS) | \ + egrep -v -f ./filter-dialyzer-dep-warnings + +clean_plt: + rm $(PLT) + ## ## Release targets ## @@ -52,8 +87,3 @@ relclean: stage : rel $(foreach dep,$(wildcard deps/*), rm -rf rel/$(REPO)/lib/$(shell basename $(dep))* && ln -sf $(abspath $(dep)) rel/$(REPO)/lib;) - -DIALYZER_APPS = kernel stdlib sasl erts ssl compiler eunit crypto -PLT = $(HOME)/.machi_dialyzer_plt - -include tools.mk diff --git a/dialyzer.ignore-warnings b/dialyzer.ignore-warnings deleted file mode 100644 index db73995..0000000 --- a/dialyzer.ignore-warnings +++ /dev/null @@ -1,14 +0,0 @@ -### The auto-generated code of machi_pb.beam has some complaints, not fixed yet. -machi_pb.erl:0: -################################################## -######## Specific types ##################### -################################################## - basho_bench_config:get/2 - machi_partition_simulator:get/1 -################################################## -######## Specific messages ##################### -################################################## -machi_chain_manager1.erl:2473: The created fun has no local return -machi_chain_manager1.erl:2102: The pattern <_P1, P2, Else = {'expected_author2', UPI1_tail, _}> can never match the type <#projection_v1{epoch_number::'undefined' | non_neg_integer(),epoch_csum::'undefined' | binary(),author_server::atom() | binary(),all_members::'undefined' | [atom() | binary()],witnesses::[atom() | binary()],creation_time::'undefined' | {non_neg_integer(),non_neg_integer(),non_neg_integer()},mode::'ap_mode' | 'cp_mode',upi::'undefined' | [atom() | binary()],repairing::'undefined' | [atom() | binary()],down::'undefined' | [atom() | binary()],dbg::'undefined' | [any()],dbg2::'undefined' | [any()],members_dict::'undefined' | [{_,_}]},#projection_v1{epoch_number::'undefined' | non_neg_integer(),epoch_csum::binary(),author_server::atom() | binary(),all_members::'undefined' | [atom() | binary()],witnesses::[atom() | binary()],creation_time::'undefined' | {non_neg_integer(),non_neg_integer(),non_neg_integer()},mode::'ap_mode' | 'cp_mode',upi::'undefined' | [atom() | binary()],repairing::'undefined' | [atom() | binary()],down::'undefined' | [atom() | binary()],dbg::'undefined' | [any()],dbg2::'undefined' | [any()],members_dict::'undefined' | [{_,_}]},'true'> -machi_chain_manager1.erl:2151: The pattern <_P1 = {'projection_v1', _, _, _, _, _, _, 'cp_mode', UPI1, Repairing1, _, _, _, _}, _P2 = {'projection_v1', _, _, _, _, _, _, 'cp_mode', UPI2, Repairing2, _, _, _, _}, Else = {'epoch_not_si', EpochX, 'not_gt', EpochY}> can never match the type <#projection_v1{epoch_number::'undefined' | non_neg_integer(),epoch_csum::'undefined' | binary(),author_server::atom() | binary(),all_members::'undefined' | [atom() | binary()],witnesses::[atom() | binary()],creation_time::'undefined' | {non_neg_integer(),non_neg_integer(),non_neg_integer()},mode::'ap_mode' | 'cp_mode',upi::'undefined' | [atom() | binary()],repairing::'undefined' | [atom() | binary()],down::'undefined' | [atom() | binary()],dbg::'undefined' | [any()],dbg2::'undefined' | [any()],members_dict::'undefined' | [{_,_}]},#projection_v1{epoch_number::'undefined' | non_neg_integer(),epoch_csum::binary(),author_server::atom() | binary(),all_members::'undefined' | [atom() | binary()],witnesses::[atom() | binary()],creation_time::'undefined' | {non_neg_integer(),non_neg_integer(),non_neg_integer()},mode::'ap_mode' | 'cp_mode',upi::'undefined' | [atom() | binary()],repairing::'undefined' | [atom() | binary()],down::'undefined' | [atom() | binary()],dbg::'undefined' | [any()],dbg2::'undefined' | [any()],members_dict::'undefined' | [{_,_}]},'true'> -machi_flu1.erl:246: The created fun has no local return diff --git a/filter-dialyzer-dep-warnings b/filter-dialyzer-dep-warnings new file mode 100644 index 0000000..6b2e4e1 --- /dev/null +++ b/filter-dialyzer-dep-warnings @@ -0,0 +1,21 @@ +####################### patterns for general errors in dep modules: +^riak_dt[a-z_]*\.erl: +^protobuffs\.erl: +^protobuffs_[a-z_]*\.erl: +^leexinc\.hrl:[0-9][0-9]*: +^machi_chain_manager1.erl:[0-9][0-9]*: Guard test RetrospectiveP::'false' =:= 'true' can never succeed +^machi_pb\.erl:[0-9][0-9]*: +^pokemon_pb\.erl:[0-9][0-9]*: +####################### patterns for unknown functions: +^ basho_bench_config:get/2 +^ erl_prettypr:format/1 +^ erl_syntax:form_list/1 +^ machi_partition_simulator:get/1 +####################### Dialyzer warnings that including test/* code will silence +### Test code includes more variety so these match errors disappear +machi_chain_manager1.erl.* The pattern .*P1.*P2.*expected_author2.*can never match the type +machi_chain_manager1.erl.* The pattern .*P1.*P2.*epoch_not_si.*can never match the type +### There is type spec problems in riak_dt which contaminates all of machi_fitness.erl +### machi_fitness.erl works in all common cases, so this must be a spec definition problem? +### Ignore machi_fitness.erl complaints until we can straighten this out. +^machi_fitness.erl: diff --git a/src/machi.proto b/src/machi.proto index 2cec895..e5d77d9 100644 --- a/src/machi.proto +++ b/src/machi.proto @@ -205,11 +205,15 @@ message Mpb_ReadChunkReq { // only makes sense if flag_no_checksum is not set). // TODO: not implemented yet. optional uint32 flag_no_chunk = 3 [default=0]; + + // TODO: not implemented yet. + optional uint32 flag_needs_trimmed = 4 [default=0]; } message Mpb_ReadChunkResp { required Mpb_GeneralStatusCode status = 1; repeated Mpb_Chunk chunks = 2; + repeated Mpb_ChunkPos trimmed = 3; } // High level API: trim_chunk() request & response @@ -410,11 +414,14 @@ message Mpb_LL_ReadChunkReq { // only makes sense if flag_checksum is not set). // TODO: not implemented yet. optional uint32 flag_no_chunk = 4 [default=0]; + + optional uint32 flag_needs_trimmed = 5 [default=0]; } message Mpb_LL_ReadChunkResp { required Mpb_GeneralStatusCode status = 1; repeated Mpb_Chunk chunks = 2; + repeated Mpb_ChunkPos trimmed = 3; } // Low level API: checksum_list() diff --git a/src/machi_admin_util.erl b/src/machi_admin_util.erl index f40fa3e..fb6dedb 100644 --- a/src/machi_admin_util.erl +++ b/src/machi_admin_util.erl @@ -76,7 +76,7 @@ verify_file_checksums_local2(Sock1, EpochID, Path0) -> ReadChunk = fun(F, Offset, Size) -> case file:pread(FH, Offset, Size) of {ok, Bin} -> - {ok, [{F, Offset, Bin, undefined}]}; + {ok, {[{F, Offset, Bin, undefined}], []}}; Err -> Err end @@ -92,7 +92,7 @@ verify_file_checksums_local2(Sock1, EpochID, Path0) -> verify_file_checksums_remote2(Sock1, EpochID, File) -> ReadChunk = fun(File_name, Offset, Size) -> ?FLU_C:read_chunk(Sock1, EpochID, - File_name, Offset, Size) + File_name, Offset, Size, []) end, verify_file_checksums_common(Sock1, EpochID, File, ReadChunk). @@ -117,7 +117,7 @@ verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) -> verify_chunk_checksum(File, ReadChunk) -> fun({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) -> case ReadChunk(File, Offset, Size) of - {ok, [{_, Offset, Chunk, _}]} -> + {ok, {[{_, Offset, Chunk, _}], _}} -> CSum2 = machi_util:checksum_chunk(Chunk), if CSum == CSum2 -> Acc; diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index 157ac5b..f96ba4e 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -226,8 +226,8 @@ test_read_latest_public_projection(Pid, ReadRepairP) -> %% local projection store. init({MyName, InitMembersDict, MgrOpts}) -> - put(ttt, [?LINE]), - _ = random:seed(now()), +put(ttt, [?LINE]), + random:seed(now()), init_remember_down_list(), Opt = fun(Key, Default) -> proplists:get_value(Key, MgrOpts, Default) end, InitWitness_list = Opt(witnesses, []), diff --git a/src/machi_chain_repair.erl b/src/machi_chain_repair.erl index 1e26fd6..13f1413 100644 --- a/src/machi_chain_repair.erl +++ b/src/machi_chain_repair.erl @@ -105,10 +105,10 @@ repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) -> RepairMode = proplists:get_value(repair_mode, Opts, repair), Verb = proplists:get_value(verbose, Opts, true), Res = try - _ = [begin - {ok, Proxy} = machi_proxy_flu1_client:start_link(P), - Add(FLU, Proxy) - end || {FLU,P} <- MembersDict, lists:member(FLU, OurFLUs)], + [begin + {ok, Proxy} = machi_proxy_flu1_client:start_link(P), + Add(FLU, Proxy) + end || {FLU,P} <- MembersDict, lists:member(FLU, OurFLUs)], ProxiesDict = get(proxies_dict), D = dict:new(), @@ -280,8 +280,8 @@ make_repair_directives3([{Offset, Size, CSum, _FLU}=A|Rest0], true -> Src; false -> hd(Gots) end, - _ = [ets:update_counter(ETS, {directive_bytes, FLU_m}, Size) || - FLU_m <- Missing], + [ets:update_counter(ETS, {directive_bytes, FLU_m}, Size) || + FLU_m <- Missing], if Missing == [] -> noop; true -> @@ -324,7 +324,8 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) -> _ -> ok end, _T1 = os:timestamp(), - {ok, [{_, Offset, Chunk, _}]} = + %% TODO: support case multiple written or trimmed chunks returned + {ok, {[{_, Offset, Chunk, _}], _}} = machi_proxy_flu1_client:read_chunk( SrcP, EpochID, File, Offset, Size, ?SHORT_TIMEOUT), @@ -332,19 +333,19 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) -> <<_Tag:1/binary, CSum/binary>> = TaggedCSum, case machi_util:checksum_chunk(Chunk) of CSum_now when CSum_now == CSum -> - _ = [begin - DstP = orddict:fetch(DstFLU, ProxiesDict), - _T3 = os:timestamp(), - ok = machi_proxy_flu1_client:write_chunk( - DstP, EpochID, File, Offset, Chunk, - ?SHORT_TIMEOUT), - _T4 = os:timestamp() - end || DstFLU <- MyDsts], - _ = ets:update_counter(ETS, in_chunks, 1), - _ = ets:update_counter(ETS, in_bytes, Size), + [begin + DstP = orddict:fetch(DstFLU, ProxiesDict), + _T3 = os:timestamp(), + ok = machi_proxy_flu1_client:write_chunk( + DstP, EpochID, File, Offset, Chunk, + ?SHORT_TIMEOUT), + _T4 = os:timestamp() + end || DstFLU <- MyDsts], + ets:update_counter(ETS, in_chunks, 1), + ets:update_counter(ETS, in_bytes, Size), N = length(MyDsts), - _ = ets:update_counter(ETS, out_chunks, N), - _ = ets:update_counter(ETS, out_bytes, N*Size), + ets:update_counter(ETS, out_chunks, N), + ets:update_counter(ETS, out_bytes, N*Size), Acc2; CSum_now -> error_logger:error_msg( @@ -363,8 +364,8 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) -> end, ok = lists:foldl(F, ok, Cmds), %% Copy this file's stats to the total counts. - _ = [ets:update_counter(ETS, T_K, ets:lookup_element(ETS, L_K, 2)) || - {L_K, T_K} <- EtsKeys], + [ets:update_counter(ETS, T_K, ets:lookup_element(ETS, L_K, 2)) || + {L_K, T_K} <- EtsKeys], Acc. mbytes(N) -> diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index 1960d4f..f6b712a 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -122,7 +122,7 @@ append_chunk/3, append_chunk/4, append_chunk_extra/4, append_chunk_extra/5, write_chunk/4, write_chunk/5, - read_chunk/4, read_chunk/5, + read_chunk/5, read_chunk/6, trim_chunk/4, trim_chunk/5, checksum_list/2, checksum_list/3, list_files/1, list_files/2, @@ -201,14 +201,14 @@ write_chunk(PidSpec, File, Offset, Chunk, Timeout0) -> %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. -read_chunk(PidSpec, File, Offset, Size) -> - read_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT). +read_chunk(PidSpec, File, Offset, Size, Opts) -> + read_chunk(PidSpec, File, Offset, Size, Opts, ?DEFAULT_TIMEOUT). %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. -read_chunk(PidSpec, File, Offset, Size, Timeout0) -> +read_chunk(PidSpec, File, Offset, Size, Opts, Timeout0) -> {TO, Timeout} = timeout(Timeout0), - gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size, TO}}, + gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size, Opts, TO}}, Timeout). %% @doc Trim a chunk of data of size `Size' from `File' at `Offset'. @@ -288,8 +288,8 @@ handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra, TO}, _From, S) -> do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), TO, S); handle_call2({write_chunk, File, Offset, Chunk, TO}, _From, S) -> do_write_head(File, Offset, Chunk, 0, os:timestamp(), TO, S); -handle_call2({read_chunk, File, Offset, Size, TO}, _From, S) -> - do_read_chunk(File, Offset, Size, 0, os:timestamp(), TO, S); +handle_call2({read_chunk, File, Offset, Size, Opts, TO}, _From, S) -> + do_read_chunk(File, Offset, Size, Opts, 0, os:timestamp(), TO, S); handle_call2({trim_chunk, File, Offset, Size, TO}, _From, S) -> do_trim_chunk(File, Offset, Size, 0, os:timestamp(), TO, S); handle_call2({checksum_list, File, TO}, _From, S) -> @@ -503,11 +503,10 @@ do_write_head2(File, Offset, Chunk, Depth, STime, TO, iolist_size(Chunk)}) end. -do_read_chunk(File, Offset, Size, 0=Depth, STime, TO, +do_read_chunk(File, Offset, Size, Opts, 0=Depth, STime, TO, #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty - do_read_chunk2(File, Offset, Size, Depth + 1, STime, TO, S); -do_read_chunk(File, Offset, Size, Depth, STime, TO, #state{proj=P}=S) -> - %% io:format(user, "read sleep1,", []), + do_read_chunk2(File, Offset, Size, Opts, Depth + 1, STime, TO, S); +do_read_chunk(File, Offset, Size, Opts, Depth, STime, TO, #state{proj=P}=S) -> sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > TO -> @@ -517,22 +516,22 @@ do_read_chunk(File, Offset, Size, Depth, STime, TO, #state{proj=P}=S) -> case S2#state.proj of P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> - do_read_chunk(File, Offset, Size, Depth + 1, STime, TO, S2); + do_read_chunk(File, Offset, Size, Opts, Depth + 1, STime, TO, S2); _ -> - do_read_chunk2(File, Offset, Size, Depth + 1, STime, TO, S2) + do_read_chunk2(File, Offset, Size, Opts, Depth + 1, STime, TO, S2) end end. -do_read_chunk2(File, Offset, Size, Depth, STime, TO, +do_read_chunk2(File, Offset, Size, Opts, Depth, STime, TO, #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> UPI = readonly_flus(P), Tail = lists:last(UPI), ConsistencyMode = P#projection_v1.mode, case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, - File, Offset, Size, ?TIMEOUT) of - {ok, Chunks0} when is_list(Chunks0) -> + File, Offset, Size, Opts, ?TIMEOUT) of + {ok, {Chunks0, []}} when is_list(Chunks0) -> Chunks = trim_both_side(Chunks0, Offset, Offset + Size), - {reply, {ok, Chunks}, S}; + {reply, {ok, {Chunks, []}}, S}; %% {ok, BadChunk} -> %% %% TODO cleaner handling of bad chunks %% exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, @@ -546,7 +545,7 @@ do_read_chunk2(File, Offset, Size, Depth, STime, TO, {reply, BadCS, S}; {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> - do_read_chunk(File, Offset, Size, Depth, STime, TO, S); + do_read_chunk(File, Offset, Size, Opts, Depth, STime, TO, S); {error, not_written} -> read_repair(ConsistencyMode, read, File, Offset, Size, Depth, STime, S); %% {reply, {error, not_written}, S}; @@ -624,10 +623,12 @@ read_repair2(ap_mode=ConsistencyMode, #state{proj=P}=S) -> Eligible = mutation_flus(P), case try_to_find_chunk(Eligible, File, Offset, Size, S) of - {ok, Chunks, GotItFrom} when is_list(Chunks) -> + {ok, {Chunks, _Trimmed}, GotItFrom} when is_list(Chunks) -> ToRepair = mutation_flus(P) -- [GotItFrom], - {Reply, S1} = do_repair_chunks(Chunks, ToRepair, ReturnMode, [GotItFrom], + {Reply0, S1} = do_repair_chunks(Chunks, ToRepair, ReturnMode, [GotItFrom], File, Depth, STime, S, {ok, Chunks}), + {ok, Chunks} = Reply0, + Reply = {ok, {Chunks, _Trimmed}}, {reply, Reply, S1}; {error, bad_checksum}=BadCS -> %% TODO: alternate strategy? @@ -818,7 +819,7 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) -> run_middleworker_job(Fun, ArgList, WTimeout) -> Parent = self(), MiddleWorker = - spawn(fun() -> + spawn_link(fun() -> PidsMons = [spawn_monitor(fun() -> Res = (catch Fun(Arg)), @@ -859,17 +860,19 @@ try_to_find_chunk(Eligible, File, Offset, Size, Work = fun(FLU) -> Proxy = orddict:fetch(FLU, PD), case ?FLU_PC:read_chunk(Proxy, EpochID, - File, Offset, Size) of - {ok, Chunks} when is_list(Chunks) -> - {FLU, {ok, Chunks}}; + %% TODO Trimmed is required here + File, Offset, Size, []) of + {ok, {_Chunks, _} = ChunksAndTrimmed} -> + {FLU, {ok, ChunksAndTrimmed}}; Else -> {FLU, Else} end end, Rs = run_middleworker_job(Work, Eligible, Timeout), - case [X || {_, {ok, [{_,_,B,_}]}}=X <- Rs, is_binary(B)] of - [{FoundFLU, {ok, Chunk}}|_] -> - {ok, Chunk, FoundFLU}; + + case [X || {_Fluname, {ok, {[{_,_,B,_}], _}}}=X <- Rs, is_binary(B)] of + [{FoundFLU, {ok, ChunkAndTrimmed}}|_] -> + {ok, ChunkAndTrimmed, FoundFLU}; [] -> RetryErrs = [partition, bad_epoch, wedged], case [Err || {error, Err} <- Rs, lists:member(Err, RetryErrs)] of diff --git a/src/machi_file_proxy.erl b/src/machi_file_proxy.erl index a149c68..7a5c996 100644 --- a/src/machi_file_proxy.erl +++ b/src/machi_file_proxy.erl @@ -52,6 +52,7 @@ sync/1, sync/2, read/3, + read/4, write/3, write/4, append/2, @@ -83,9 +84,9 @@ wedged = false :: boolean(), csum_file :: string()|undefined, csum_path :: string()|undefined, - eof_position = 0 :: non_neg_integer(), data_filehandle :: file:io_device(), csum_table :: machi_csum_table:table(), + eof_position = 0 :: non_neg_integer(), tref :: reference(), %% timer ref ticks = 0 :: non_neg_integer(), %% ticks elapsed with no new operations ops = 0 :: non_neg_integer(), %% sum of all ops @@ -135,11 +136,22 @@ sync(_Pid, Type) -> {ok, [{Filename::string(), Offset :: non_neg_integer(), Data :: binary(), Checksum :: binary()}]} | {error, Reason :: term()}. -read(Pid, Offset, Length) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0 - andalso is_integer(Length) andalso Length > 0 -> - gen_server:call(Pid, {read, Offset, Length}, ?TIMEOUT); -read(_Pid, Offset, Length) -> - lager:warning("Bad args to read: Offset ~p, Length ~p", [Offset, Length]), +read(Pid, Offset, Length) -> + read(Pid, Offset, Length, []). + +-spec read(Pid :: pid(), + Offset :: non_neg_integer(), + Length :: non_neg_integer(), + [{no_checksum|no_chunk|needs_trimmed, boolean()}]) -> + {ok, [{Filename::string(), Offset :: non_neg_integer(), + Data :: binary(), Checksum :: binary()}]} | + {error, Reason :: term()}. +read(Pid, Offset, Length, Opts) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0 + andalso is_integer(Length) andalso Length > 0 + andalso is_list(Opts) -> + gen_server:call(Pid, {read, Offset, Length, Opts}, ?TIMEOUT); +read(_Pid, Offset, Length, Opts) -> + lager:warning("Bad args to read: Offset ~p, Length ~p, Options ~p", [Offset, Length, Opts]), {error, bad_arg}. % @doc Write data at offset @@ -211,8 +223,8 @@ init({Filename, DataDir}) -> csum_table = CsumTable, tref = Tref, eof_position = Eof}, - lager:debug("Starting file proxy ~p for filename ~p, state = ~p", - [self(), Filename, St]), + lager:debug("Starting file proxy ~p for filename ~p, state = ~p, Eof = ~p", + [self(), Filename, St, Eof]), {ok, St}. % @private @@ -250,13 +262,13 @@ handle_call({sync, all}, _From, State = #state{filename = F, %%% READS -handle_call({read, _Offset, _Length}, _From, +handle_call({read, _Offset, _Length, _}, _From, State = #state{wedged = true, reads = {T, Err} }) -> {reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}}; -handle_call({read, Offset, Length}, _From, +handle_call({read, Offset, Length, _Opts}, _From, State = #state{eof_position = Eof, reads = {T, Err} }) when Offset > Eof -> @@ -265,23 +277,28 @@ handle_call({read, Offset, Length}, _From, [Offset, Length, Eof]), {reply, {error, not_written}, State#state{reads = {T + 1, Err + 1}}}; -handle_call({read, Offset, Length}, _From, +handle_call({read, Offset, Length, Opts}, _From, State = #state{filename = F, data_filehandle = FH, csum_table = CsumTable, + eof_position = EofP, reads = {T, Err} }) -> + NoChecksum = proplists:get_value(no_checksum, Opts, false), + NoChunk = proplists:get_value(no_chunk, Opts, false), + NeedsMerge = proplists:get_value(needs_trimmed, Opts, false), {Resp, NewErr} = - case do_read(FH, F, CsumTable, Offset, Length) of - {ok, []} -> + case do_read(FH, F, CsumTable, Offset, Length, NoChecksum, NoChunk, NeedsMerge) of + {ok, {[], []}} -> {{error, not_written}, Err + 1}; - {ok, Chunks} -> - %% Kludge to wrap read result in tuples, to support fragmented read - %% XXX FIXME - %% For now we are omiting the checksum data because it blows up - %% protobufs. - {{ok, Chunks}, Err}; + {ok, {Chunks, Trimmed}} -> + %% Kludge to wrap read result in tuples, to support fragmented read + %% XXX FIXME + %% For now we are omiting the checksum data because it blows up + %% protobufs. + {{ok, {Chunks, Trimmed}}, Err}; Error -> + lager:error("Can't read ~p, ~p at File ~p", [Offset, Length, F]), {Error, Err + 1} end, {reply, Resp, State#state{reads = {T+1, NewErr}}}; @@ -298,6 +315,7 @@ handle_call({write, Offset, ClientMeta, Data}, _From, State = #state{filename = F, writes = {T, Err}, data_filehandle = FHd, + eof_position=EofP, csum_table = CsumTable}) -> ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE), @@ -318,6 +336,8 @@ handle_call({write, Offset, ClientMeta, Data}, _From, end end, {NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable)), + lager:debug("Wrote ~p bytes at ~p of file ~p, NewEOF = ~p~n", + [iolist_size(Data), Offset, F, NewEof]), {reply, Resp, State#state{writes = {T+1, NewErr}, eof_position = NewEof}}; @@ -351,15 +371,14 @@ handle_call({append, ClientMeta, Extra, Data}, _From, ok -> {{ok, F, EofP}, Err}; Error -> - {Error, Err + 1, EofP} + {Error, Err + 1} end end, - %% TODO: do we check this with calling - %% machi_csum_table:calc_unwritten_bytes/1? NewEof = EofP + byte_size(Data) + Extra, + lager:debug("appended ~p bytes at ~p file ~p. NewEofP = ~p", + [iolist_size(Data), EofP, F, NewEof]), {reply, Resp, State#state{appends = {T+1, NewErr}, - eof_position = NewEof - }}; + eof_position = NewEof}}; handle_call(Req, _From, State) -> lager:warning("Unknown call: ~p", [Req]), @@ -500,7 +519,10 @@ check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) -> Filename :: string(), CsumTable :: machi_csum_table:table(), Offset :: non_neg_integer(), - Size :: non_neg_integer() + Size :: non_neg_integer(), + NoChecksum :: boolean(), + NoChunk :: boolean(), + NeedsTrimmed :: boolean() ) -> {ok, Chunks :: [{string(), Offset::non_neg_integer(), binary(), Csum :: binary()}]} | {error, bad_checksum} | {error, partial_read} | @@ -519,6 +541,9 @@ check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) -> % tuple is returned. % % +do_read(FHd, Filename, CsumTable, Offset, Size, _, _, _) -> + do_read(FHd, Filename, CsumTable, Offset, Size). + do_read(FHd, Filename, CsumTable, Offset, Size) -> %% Note that find/3 only returns overlapping chunks, both borders %% are not aligned to original Offset and Size. @@ -526,7 +551,8 @@ do_read(FHd, Filename, CsumTable, Offset, Size) -> read_all_ranges(FHd, Filename, ChunkCsums, []). read_all_ranges(_, _, [], ReadChunks) -> - {ok, lists:reverse(ReadChunks)}; + %% TODO: currently returns empty list of trimmed chunks + {ok, {lists:reverse(ReadChunks), []}}; read_all_ranges(FHd, Filename, [{Offset, Size, TaggedCsum}|T], ReadChunks) -> case file:pread(FHd, Offset, Size) of @@ -592,12 +618,12 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) -> {error, Reason} end; [{Offset, Size, TaggedCsum}] -> - case do_read(FHd, Filename, CsumTable, Offset, Size) of + case do_read(FHd, Filename, CsumTable, Offset, Size, false, false, false) of {error, _} = E -> lager:warning("This should never happen: got ~p while reading at offset ~p in file ~p that's supposedly written", [E, Offset, Filename]), {error, server_insanity}; - {ok, [{_, Offset, Data, TaggedCsum}]} -> + {ok, {[{_, Offset, Data, TaggedCsum}], _}} -> %% TODO: what if different checksum got from do_read()? ok; {ok, _Other} -> @@ -632,7 +658,6 @@ do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data) -> ok = machi_csum_table:write(CsumTable, Offset, Size, TaggedCsum), lager:debug("Successful write to checksum file for ~p", [Filename]), - %% io:format(user, "here, heh ~p~n", [?LINE]), ok; Other -> lager:error("Got ~p during write to file ~p at offset ~p, length ~p", diff --git a/src/machi_fitness.erl b/src/machi_fitness.erl index bf16198..affc09a 100644 --- a/src/machi_fitness.erl +++ b/src/machi_fitness.erl @@ -86,7 +86,7 @@ send_spam_to_everyone(Pid) -> init([{MyFluName}|Args]) -> RegName = machi_flu_psup:make_fitness_regname(MyFluName), register(RegName, self()), - {ok, _} = timer:send_interval(5000, debug_dump), + timer:send_interval(5000, debug_dump), UseSimulatorP = proplists:get_value(use_partition_simulator, Args, false), {ok, #state{my_flu_name=MyFluName, reg_name=RegName, partition_simulator_p=UseSimulatorP, @@ -171,7 +171,7 @@ handle_info({adjust_down_list, FLU}, #state{active_unfit=ActiveUnfit}=S) -> %% hiding where we need this extra round of messages to *remove* a %% FLU from the active_unfit list? - _ = schedule_adjust_messages(lists:usort(Added_to_new ++ Dropped_from_new)), + schedule_adjust_messages(lists:usort(Added_to_new ++ Dropped_from_new)), case {lists:member(FLU,Added_to_new), lists:member(FLU,Dropped_from_new)} of {true, true} -> error({bad, ?MODULE, ?LINE, FLU, ActiveUnfit, NewUnfit}); @@ -295,8 +295,8 @@ proxy_pid(Name, #state{proxies_dict=ProxiesDict}) -> calc_unfit(All_list, HosedAnnotations) -> G = digraph:new(), - _ = [digraph:add_vertex(G, V) || V <- All_list], - _ = [digraph:add_edge(G, V1, V2) || {V1, problem_with, V2} <- HosedAnnotations], + [digraph:add_vertex(G, V) || V <- All_list], + [digraph:add_edge(G, V1, V2) || {V1, problem_with, V2} <- HosedAnnotations], calc_unfit2(lists:sort(digraph:vertices(G)), G). calc_unfit2([], G) -> @@ -350,7 +350,7 @@ do_map_change(NewMap, DontSendList, MembersDict, #state{my_flu_name=_MyFluName, pending_map=OldMap}=S) -> send_spam(NewMap, DontSendList, MembersDict, S), ChangedServers = find_changed_servers(OldMap, NewMap, _MyFluName), - _ = schedule_adjust_messages(ChangedServers), + schedule_adjust_messages(ChangedServers), %% _OldMapV = map_value(OldMap), %% _MapV = map_value(NewMap), %% io:format(user, "TODO: ~w async tick trigger/scheduling... ~w for:\n" diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 370271f..b4740fd 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -241,7 +241,7 @@ append_server_loop(FluPid, #state{wedged=Wedged_p, {seq_append, From, Prefix, Chunk, CSum, Extra, EpochID} -> %% Old is the one from our state, plain old 'EpochID' comes %% from the client. - _ = case OldEpochId == EpochID of + case OldEpochId == EpochID of true -> spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum, Extra, FluName, EpochID) @@ -381,7 +381,7 @@ do_pb_ll_request2(EpochID, CMD, S) -> true -> %% We're at same epoch # but different checksum, or %% we're at a newer/bigger epoch #. - _ = wedge_myself(S#state.flu_name, CurrentEpochID), + wedge_myself(S#state.flu_name, CurrentEpochID), ok end, {{error, bad_epoch}, S#state{epoch_id=CurrentEpochID}}; @@ -450,9 +450,9 @@ do_pb_hl_request2({high_write_chunk, File, Offset, ChunkBin, TaggedCSum}, Chunk = {TaggedCSum, ChunkBin}, Res = machi_cr_client:write_chunk(Clnt, File, Offset, Chunk), {Res, S}; -do_pb_hl_request2({high_read_chunk, File, Offset, Size}, +do_pb_hl_request2({high_read_chunk, File, Offset, Size, Opts}, #state{high_clnt=Clnt}=S) -> - Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size), + Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size, Opts), {Res, S}; do_pb_hl_request2({high_trim_chunk, File, Offset, Size}, #state{high_clnt=Clnt}=S) -> @@ -548,13 +548,13 @@ do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, #state{flu_name=FluNa {error, bad_arg} end. -do_server_read_chunk(File, Offset, Size, _Opts, #state{flu_name=FluName})-> +do_server_read_chunk(File, Offset, Size, Opts, #state{flu_name=FluName})-> %% TODO: Look inside Opts someday. case sanitize_file_string(File) of ok -> {ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}), - case machi_file_proxy:read(Pid, Offset, Size) of - {ok, Chunks} -> {ok, Chunks}; + case machi_file_proxy:read(Pid, Offset, Size, Opts) of + {ok, ChunksAndTrimmed} -> {ok, ChunksAndTrimmed}; Other -> Other end; _ -> diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index cc9f175..da90618 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -56,7 +56,7 @@ %% File API append_chunk/4, append_chunk/5, append_chunk_extra/5, append_chunk_extra/6, - read_chunk/5, read_chunk/6, + read_chunk/6, read_chunk/7, checksum_list/3, checksum_list/4, list_files/2, list_files/3, wedge_status/1, wedge_status/2, @@ -144,26 +144,28 @@ append_chunk_extra(Host, TcpPort, EpochID, Prefix, Chunk, ChunkExtra) %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. --spec read_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size()) -> +-spec read_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size(), + proplists:proplist()) -> {ok, machi_dt:chunk_s()} | {error, machi_dt:error_general() | 'not_written' | 'partial_read'} | {error, term()}. -read_chunk(Sock, EpochID, File, Offset, Size) +read_chunk(Sock, EpochID, File, Offset, Size, Opts) when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> - read_chunk2(Sock, EpochID, File, Offset, Size). + read_chunk2(Sock, EpochID, File, Offset, Size, Opts). %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. -spec read_chunk(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(), - machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size()) -> + machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size(), + proplists:proplist()) -> {ok, machi_dt:chunk_s()} | {error, machi_dt:error_general() | 'not_written' | 'partial_read'} | {error, term()}. -read_chunk(Host, TcpPort, EpochID, File, Offset, Size) +read_chunk(Host, TcpPort, EpochID, File, Offset, Size, Opts) when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try - read_chunk2(Sock, EpochID, File, Offset, Size) + read_chunk2(Sock, EpochID, File, Offset, Size, Opts) after disconnect(Sock) end. @@ -516,12 +518,12 @@ trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> %%%%%%%%%%%%%%%%%%%%%%%%%%% -read_chunk2(Sock, EpochID, File0, Offset, Size) -> +read_chunk2(Sock, EpochID, File0, Offset, Size, Opts) -> ReqID = <<"id">>, File = machi_util:make_binary(File0), Req = machi_pb_translate:to_pb_request( ReqID, - {low_read_chunk, EpochID, File, Offset, Size, []}), + {low_read_chunk, EpochID, File, Offset, Size, Opts}), do_pb_request_common(Sock, ReqID, Req). append_chunk2(Sock, EpochID, Prefix0, Chunk0, ChunkExtra) -> diff --git a/src/machi_pb_high_client.erl b/src/machi_pb_high_client.erl index 1d54d07..5509803 100644 --- a/src/machi_pb_high_client.erl +++ b/src/machi_pb_high_client.erl @@ -40,7 +40,7 @@ auth/3, auth/4, append_chunk/6, append_chunk/7, write_chunk/5, write_chunk/6, - read_chunk/4, read_chunk/5, + read_chunk/5, read_chunk/6, trim_chunk/4, trim_chunk/5, checksum_list/2, checksum_list/3, list_files/1, list_files/2 @@ -94,11 +94,18 @@ write_chunk(PidSpec, File, Offset, Chunk, CSum) -> write_chunk(PidSpec, File, Offset, Chunk, CSum, Timeout) -> send_sync(PidSpec, {write_chunk, File, Offset, Chunk, CSum}, Timeout). -read_chunk(PidSpec, File, Offset, Size) -> - read_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT). +-spec read_chunk(pid(), string(), pos_integer(), pos_integer(), + [{flag_no_checksum | flag_no_chunk | needs_trimmed, boolean()}]) -> + {ok, {list(), list()}} | {error, term()}. +read_chunk(PidSpec, File, Offset, Size, Options) -> + read_chunk(PidSpec, File, Offset, Size, Options, ?DEFAULT_TIMEOUT). -read_chunk(PidSpec, File, Offset, Size, Timeout) -> - send_sync(PidSpec, {read_chunk, File, Offset, Size}, Timeout). +-spec read_chunk(pid(), string(), pos_integer(), pos_integer(), + [{no_checksum | no_chunk | needs_trimmed, boolean()}], + pos_integer()) -> + {ok, {list(), list()}} | {error, term()}. +read_chunk(PidSpec, File, Offset, Size, Options, Timeout) -> + send_sync(PidSpec, {read_chunk, File, Offset, Size, Options}, Timeout). trim_chunk(PidSpec, File, Offset, Size) -> trim_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT). @@ -282,13 +289,19 @@ do_send_sync2({write_chunk, File, Offset, Chunk, CSum}, Res = {bummer, {X, Y, erlang:get_stacktrace()}}, {Res, S#state{count=Count+1}} end; -do_send_sync2({read_chunk, File, Offset, Size}, +do_send_sync2({read_chunk, File, Offset, Size, Options}, #state{sock=Sock, sock_id=Index, count=Count}=S) -> try ReqID = <>, + FlagNoChecksum = proplists:get_value(no_checksum, Options, false), + FlagNoChunk = proplists:get_value(no_chunk, Options, false), + NeedsTrimmed = proplists:get_value(needs_trimmed, Options, false), Req = #mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File, offset=Offset, - chunk_size=Size}}, + chunk_size=Size}, + flag_no_checksum=machi_util:bool2int(FlagNoChecksum), + flag_no_chunk=machi_util:bool2int(FlagNoChunk), + flag_needs_trimmed=machi_util:bool2int(NeedsTrimmed)}, R1a = #mpb_request{req_id=ReqID, do_not_alter=1, read_chunk=Req}, Bin1a = machi_pb:encode_mpb_request(R1a), @@ -416,7 +429,7 @@ convert_write_chunk_resp(#mpb_writechunkresp{status='OK'}) -> convert_write_chunk_resp(#mpb_writechunkresp{status=Status}) -> convert_general_status_code(Status). -convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunks=PB_Chunks}) -> +convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunks=PB_Chunks, trimmed=PB_Trimmed}) -> Chunks = lists:map(fun(#mpb_chunk{offset=Offset, file_name=File, chunk=Chunk, @@ -425,7 +438,12 @@ convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunks=PB_Chunks}) -> Csum = <<(machi_pb_translate:conv_to_csum_tag(T)):8, Ck/binary>>, {File, Offset, Chunk, Csum} end, PB_Chunks), - {ok, Chunks}; + Trimmed = lists:map(fun(#mpb_chunkpos{file_name=File, + offset=Offset, + chunk_size=Size}) -> + {File, Offset, Size} + end, PB_Trimmed), + {ok, {Chunks, Trimmed}}; convert_read_chunk_resp(#mpb_readchunkresp{status=Status}) -> convert_general_status_code(Status). diff --git a/src/machi_pb_translate.erl b/src/machi_pb_translate.erl index be0d75e..cc26766 100644 --- a/src/machi_pb_translate.erl +++ b/src/machi_pb_translate.erl @@ -75,13 +75,15 @@ from_pb_request(#mpb_ll_request{ from_pb_request(#mpb_ll_request{ req_id=ReqID, read_chunk=#mpb_ll_readchunkreq{ - epoch_id=PB_EpochID, - chunk_pos=ChunkPos, - flag_no_checksum=PB_GetNoChecksum, - flag_no_chunk=PB_GetNoChunk}}) -> + epoch_id=PB_EpochID, + chunk_pos=ChunkPos, + flag_no_checksum=PB_GetNoChecksum, + flag_no_chunk=PB_GetNoChunk, + flag_needs_trimmed=PB_NeedsTrimmed}}) -> EpochID = conv_to_epoch_id(PB_EpochID), Opts = [{no_checksum, conv_to_boolean(PB_GetNoChecksum)}, - {no_chunk, conv_to_boolean(PB_GetNoChunk)}], + {no_chunk, conv_to_boolean(PB_GetNoChunk)}, + {needs_trimmed, conv_to_boolean(PB_NeedsTrimmed)}], #mpb_chunkpos{file_name=File, offset=Offset, chunk_size=Size} = ChunkPos, @@ -177,8 +179,15 @@ from_pb_request(#mpb_request{req_id=ReqID, read_chunk=IR=#mpb_readchunkreq{}}) -> #mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File, offset=Offset, - chunk_size=Size}} = IR, - {ReqID, {high_read_chunk, File, Offset, Size}}; + chunk_size=Size}, + flag_no_checksum=FlagNoChecksum, + flag_no_chunk=FlagNoChunk, + flag_needs_trimmed=NeedsTrimmed} = IR, + %% I want MAPS + Options = [{no_checksum, machi_util:int2bool(FlagNoChecksum)}, + {no_chunk, machi_util:int2bool(FlagNoChunk)}, + {needs_trimmed, machi_util:int2bool(NeedsTrimmed)}], + {ReqID, {high_read_chunk, File, Offset, Size, Options}}; from_pb_request(#mpb_request{req_id=ReqID, trim_chunk=IR=#mpb_trimchunkreq{}}) -> #mpb_trimchunkreq{chunk_pos=#mpb_chunkpos{file_name=File, @@ -233,7 +242,8 @@ from_pb_response(#mpb_ll_response{ from_pb_response(#mpb_ll_response{ req_id=ReqID, read_chunk=#mpb_ll_readchunkresp{status=Status, - chunks=PB_Chunks}}) -> + chunks=PB_Chunks, + trimmed=PB_Trimmed}}) -> case Status of 'OK' -> Chunks = lists:map(fun(#mpb_chunk{file_name=File, @@ -243,7 +253,12 @@ from_pb_response(#mpb_ll_response{ Csum = <<(conv_to_csum_tag(T)):8, Ck/binary>>, {File, Offset, Bytes, Csum} end, PB_Chunks), - {ReqID, {ok, Chunks}}; + Trimmed = lists:map(fun(#mpb_chunkpos{file_name=File, + offset=Offset, + chunk_size=Size}) -> + {File, Offset, Size} + end, PB_Trimmed), + {ReqID, {ok, {Chunks, Trimmed}}}; _ -> {ReqID, machi_pb_high_client:convert_general_status_code(Status)} end; @@ -382,17 +397,23 @@ to_pb_request(ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, C offset=Offset, chunk=Chunk, csum=PB_CSum}}}; -to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, _Opts}) -> +to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, Opts}) -> %% TODO: stop ignoring Opts ^_^ PB_EpochID = conv_from_epoch_id(EpochID), + FNChecksum = proplists:get_value(no_checksum, Opts, false), + FNChunk = proplists:get_value(no_chunk, Opts, false), + NeedsTrimmed = proplists:get_value(needs_merge, Opts, false), #mpb_ll_request{ req_id=ReqID, do_not_alter=2, read_chunk=#mpb_ll_readchunkreq{ - epoch_id=PB_EpochID, - chunk_pos=#mpb_chunkpos{ - file_name=File, - offset=Offset, - chunk_size=Size}}}; + epoch_id=PB_EpochID, + chunk_pos=#mpb_chunkpos{ + file_name=File, + offset=Offset, + chunk_size=Size}, + flag_no_checksum=machi_util:bool2int(FNChecksum), + flag_no_chunk=machi_util:bool2int(FNChunk), + flag_needs_trimmed=machi_util:bool2int(NeedsTrimmed)}}; to_pb_request(ReqID, {low_checksum_list, EpochID, File}) -> PB_EpochID = conv_from_epoch_id(EpochID), #mpb_ll_request{req_id=ReqID, do_not_alter=2, @@ -478,7 +499,7 @@ to_pb_response(ReqID, {low_write_chunk, _EID, _Fl, _Off, _Ch, _CST, _CS},Resp)-> write_chunk=#mpb_ll_writechunkresp{status=Status}}; to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)-> case Resp of - {ok, Chunks} -> + {ok, {Chunks, Trimmed}} -> PB_Chunks = lists:map(fun({File, Offset, Bytes, Csum}) -> {Tag, Ck} = machi_util:unmake_tagged_csum(Csum), #mpb_chunk{file_name=File, @@ -487,9 +508,15 @@ to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)-> csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag), csum=Ck}} end, Chunks), + PB_Trimmed = lists:map(fun({File, Offset, Size}) -> + #mpb_chunkpos{file_name=File, + offset=Offset, + chunk_size=Size} + end, Trimmed), #mpb_ll_response{req_id=ReqID, read_chunk=#mpb_ll_readchunkresp{status='OK', - chunks=PB_Chunks}}; + chunks=PB_Chunks, + trimmed=PB_Trimmed}}; {error, _}=Error -> Status = conv_from_status(Error), #mpb_ll_response{req_id=ReqID, @@ -654,10 +681,10 @@ to_pb_response(ReqID, {high_write_chunk, _File, _Offset, _Chunk, _TaggedCSum}, R _Else -> make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else])) end; -to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) -> +to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size, _}, Resp) -> case Resp of - {ok, Chunks} -> - MpbChunks = lists:map(fun({File, Offset, Bytes, Csum}) -> + {ok, {Chunks, Trimmed}} -> + PB_Chunks = lists:map(fun({File, Offset, Bytes, Csum}) -> {Tag, Ck} = machi_util:unmake_tagged_csum(Csum), #mpb_chunk{ offset=Offset, @@ -665,9 +692,15 @@ to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) -> chunk=Bytes, csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag), csum=Ck}} end, Chunks), + PB_Trimmed = lists:map(fun({File, Offset, Size}) -> + #mpb_chunkpos{file_name=File, + offset=Offset, + chunk_size=Size} + end, Trimmed), #mpb_response{req_id=ReqID, read_chunk=#mpb_readchunkresp{status='OK', - chunks=MpbChunks}}; + chunks=PB_Chunks, + trimmed=PB_Trimmed}}; {error, _}=Error -> Status = conv_from_status(Error), #mpb_response{req_id=ReqID, diff --git a/src/machi_proxy_flu1_client.erl b/src/machi_proxy_flu1_client.erl index aa9f776..93f3b95 100644 --- a/src/machi_proxy_flu1_client.erl +++ b/src/machi_proxy_flu1_client.erl @@ -59,7 +59,7 @@ %% File API append_chunk/4, append_chunk/5, append_chunk_extra/5, append_chunk_extra/6, - read_chunk/5, read_chunk/6, + read_chunk/6, read_chunk/7, checksum_list/3, checksum_list/4, list_files/2, list_files/3, wedge_status/1, wedge_status/2, @@ -130,13 +130,13 @@ append_chunk_extra(PidSpec, EpochID, Prefix, Chunk, ChunkExtra, Timeout) -> %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. -read_chunk(PidSpec, EpochID, File, Offset, Size) -> - read_chunk(PidSpec, EpochID, File, Offset, Size, infinity). +read_chunk(PidSpec, EpochID, File, Offset, Size, Opts) -> + read_chunk(PidSpec, EpochID, File, Offset, Size, Opts, infinity). %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. -read_chunk(PidSpec, EpochID, File, Offset, Size, Timeout) -> - gen_server:call(PidSpec, {req, {read_chunk, EpochID, File, Offset, Size}}, +read_chunk(PidSpec, EpochID, File, Offset, Size, Opts, Timeout) -> + gen_server:call(PidSpec, {req, {read_chunk, EpochID, File, Offset, Size, Opts}}, Timeout). %% @doc Fetch the list of chunk checksums for `File'. @@ -299,8 +299,8 @@ write_chunk(PidSpec, EpochID, File, Offset, Chunk, Timeout) -> Timeout) of {error, written}=Err -> Size = byte_size(Chunk), - case read_chunk(PidSpec, EpochID, File, Offset, Size, Timeout) of - {ok, Chunk2} when Chunk2 == Chunk -> + case read_chunk(PidSpec, EpochID, File, Offset, Size, [], Timeout) of + {ok, {[{File, Offset, Chunk2, _}], []}} when Chunk2 == Chunk -> %% See equivalent comment inside write_projection(). ok; _ -> @@ -377,9 +377,9 @@ make_req_fun({append_chunk, EpochID, Prefix, Chunk}, make_req_fun({append_chunk_extra, EpochID, Prefix, Chunk, ChunkExtra}, #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> fun() -> Mod:append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra) end; -make_req_fun({read_chunk, EpochID, File, Offset, Size}, +make_req_fun({read_chunk, EpochID, File, Offset, Size, Opts}, #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> - fun() -> Mod:read_chunk(Sock, EpochID, File, Offset, Size) end; + fun() -> Mod:read_chunk(Sock, EpochID, File, Offset, Size, Opts) end; make_req_fun({write_chunk, EpochID, File, Offset, Chunk}, #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> fun() -> Mod:write_chunk(Sock, EpochID, File, Offset, Chunk) end; diff --git a/src/machi_sequencer.erl b/src/machi_sequencer.erl new file mode 100644 index 0000000..4d1116d --- /dev/null +++ b/src/machi_sequencer.erl @@ -0,0 +1,194 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc "Mothballed" sequencer code, perhaps to be reused sometime in +%% the future? + +-module(machi_sequencer). + +-compile(export_all). + +-include_lib("kernel/include/file.hrl"). + +-define(CONFIG_DIR, "./config"). +-define(DATA_DIR, "./data"). + +seq(Server, Prefix, Size) when is_binary(Prefix), is_integer(Size), Size > -1 -> + Server ! {seq, self(), Prefix, Size}, + receive + {assignment, File, Offset} -> + {File, Offset} + after 1*1000 -> + bummer + end. + +seq_direct(Prefix, Size) when is_binary(Prefix), is_integer(Size), Size > -1 -> + RegName = make_regname(Prefix), + seq(RegName, Prefix, Size). + +start_server() -> + start_server(?MODULE). + +start_server(Name) -> + spawn_link(fun() -> run_server(Name) end). + +run_server(Name) -> + register(Name, self()), + ets:new(?MODULE, [named_table, public, {write_concurrency, true}]), + server_loop(). + +server_loop() -> + receive + {seq, From, Prefix, Size} -> + spawn(fun() -> server_dispatch(From, Prefix, Size) end), + server_loop() + end. + +server_dispatch(From, Prefix, Size) -> + RegName = make_regname(Prefix), + case whereis(RegName) of + undefined -> + start_prefix_server(Prefix), + timer:sleep(1), + server_dispatch(From, Prefix, Size); + Pid -> + Pid ! {seq, From, Prefix, Size} + end, + exit(normal). + +start_prefix_server(Prefix) -> + spawn(fun() -> run_prefix_server(Prefix) end). + +run_prefix_server(Prefix) -> + true = register(make_regname(Prefix), self()), + ok = filelib:ensure_dir(?CONFIG_DIR ++ "/unused"), + ok = filelib:ensure_dir(?DATA_DIR ++ "/unused"), + FileNum = read_max_filenum(Prefix) + 1, + ok = increment_max_filenum(Prefix), + prefix_server_loop(Prefix, FileNum). + +prefix_server_loop(Prefix, FileNum) -> + File = make_data_filename(Prefix, FileNum), + prefix_server_loop(Prefix, File, FileNum, 0). + +prefix_server_loop(Prefix, File, FileNum, Offset) -> + receive + {seq, From, Prefix, Size} -> + From ! {assignment, File, Offset}, + prefix_server_loop(Prefix, File, FileNum, Offset + Size) + after 30*1000 -> + io:format("timeout: ~p server stopping\n", [Prefix]), + exit(normal) + end. + +make_regname(Prefix) -> + erlang:binary_to_atom(Prefix, latin1). + +make_config_filename(Prefix) -> + lists:flatten(io_lib:format("~s/~s", [?CONFIG_DIR, Prefix])). + +make_data_filename(Prefix, FileNum) -> + erlang:iolist_to_binary(io_lib:format("~s/~s.~w", + [?DATA_DIR, Prefix, FileNum])). + +read_max_filenum(Prefix) -> + case file:read_file_info(make_config_filename(Prefix)) of + {error, enoent} -> + 0; + {ok, FI} -> + FI#file_info.size + end. + +increment_max_filenum(Prefix) -> + {ok, FH} = file:open(make_config_filename(Prefix), [append]), + ok = file:write(FH, "x"), + %% ok = file:sync(FH), + ok = file:close(FH). + +%%%%%%%%%%%%%%%%% + +%% basho_bench callbacks + +-define(SEQ, ?MODULE). + +new(1) -> + start_server(), + timer:sleep(100), + {ok, unused}; +new(_Id) -> + {ok, unused}. + +run(null, _KeyGen, _ValgueGen, State) -> + {ok, State}; +run(keygen_then_null, KeyGen, _ValgueGen, State) -> + _Prefix = KeyGen(), + {ok, State}; +run(seq, KeyGen, _ValgueGen, State) -> + Prefix = KeyGen(), + {_, _} = ?SEQ:seq(?SEQ, Prefix, 1), + {ok, State}; +run(seq_direct, KeyGen, _ValgueGen, State) -> + Prefix = KeyGen(), + Name = ?SEQ:make_regname(Prefix), + case get(Name) of + undefined -> + case whereis(Name) of + undefined -> + {_, _} = ?SEQ:seq(?SEQ, Prefix, 1); + Pid -> + put(Name, Pid), + {_, _} = ?SEQ:seq(Pid, Prefix, 1) + end; + Pid -> + {_, _} = ?SEQ:seq(Pid, Prefix, 1) + end, + {ok, State}; +run(seq_ets, KeyGen, _ValgueGen, State) -> + Tab = ?MODULE, + Prefix = KeyGen(), + Res = try + BigNum = ets:update_counter(Tab, Prefix, 1), + BigBin = <>, + <> = BigBin, + %% if Offset rem 1000 == 0 -> + %% io:format("~p,~p ", [FileNum, Offset]); + %% true -> + %% ok + %% end, + {fakefake, FileNum, Offset} + catch error:badarg -> + FileNum2 = 1, Offset2 = 0, + FileBin = <>, + OffsetBin = <>, + Glop = <>, + <> = Glop, + %% if Prefix == <<"42">> -> io:format("base:~w\n", [Base]); true -> ok end, + %% Base = 0, + case ets:insert_new(Tab, {Prefix, Base}) of + true -> + {<<"fakefakefake">>, Base}; + false -> + Result2 = ets:update_counter(Tab, Prefix, 1), + {<<"fakefakefake">>, Result2} + end + end, + Res = Res, + {ok, State}. + diff --git a/src/machi_util.erl b/src/machi_util.erl index fe5f04c..358a31d 100644 --- a/src/machi_util.erl +++ b/src/machi_util.erl @@ -47,7 +47,9 @@ combinations/1, ordered_combinations/1, mk_order/2, %% Other - wait_for_death/2, wait_for_life/2 + wait_for_death/2, wait_for_life/2, + bool2int/1, + int2bool/1 ]). -include("machi.hrl"). @@ -390,3 +392,9 @@ mk_order(UPI2, Repair1) -> error -> error end || X <- UPI2], UPI2_order. + +%% C-style conversion for PB usage. +bool2int(true) -> 1; +bool2int(false) -> 0. +int2bool(0) -> false; +int2bool(I) when is_integer(I) -> true. diff --git a/src/machi_yessir_client.erl b/src/machi_yessir_client.erl index f37b618..a7cb75d 100644 --- a/src/machi_yessir_client.erl +++ b/src/machi_yessir_client.erl @@ -450,18 +450,18 @@ connect(#p_srvr{name=Name, props=Props})-> chunk_size=ChunkSize }, %% Add fake dict entries for these files - _ = [begin - Prefix = list_to_binary(io_lib:format("fake~w", [X])), - {ok, _} = append_chunk_extra(Sock, {1,<<"unused">>}, Prefix, <<>>, FileSize) - end || X <- lists:seq(1, NumFiles)], + [begin + Prefix = list_to_binary(io_lib:format("fake~w", [X])), + {ok, _} = append_chunk_extra(Sock, {1,<<"unused">>}, Prefix, <<>>, FileSize) + end || X <- lists:seq(1, NumFiles)], Sock. disconnect(#yessir{name=Name}) -> - _ = [erase(K) || {{N,offset,_}=K, _V} <- get(), N == Name], - _ = [erase(K) || {{N,chunk,_}=K, _V} <- get(), N == Name], - _ = [erase(K) || {{N,csum,_}=K, _V} <- get(), N == Name], - _ = [erase(K) || {{N,proj,_,_}=K, _V} <- get(), N == Name], + [erase(K) || {{N,offset,_}=K, _V} <- get(), N == Name], + [erase(K) || {{N,chunk,_}=K, _V} <- get(), N == Name], + [erase(K) || {{N,csum,_}=K, _V} <- get(), N == Name], + [erase(K) || {{N,proj,_,_}=K, _V} <- get(), N == Name], ok. %% Example use: diff --git a/test/machi_cr_client_test.erl b/test/machi_cr_client_test.erl index 100ff03..f5d3513 100644 --- a/test/machi_cr_client_test.erl +++ b/test/machi_cr_client_test.erl @@ -114,13 +114,14 @@ smoke_test2() -> Chunk1_badcs = {<>, Chunk1}, {error, bad_checksum} = machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs), - {ok, [{_, Off1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, Off1, Size1), + {ok, {[{_, Off1, Chunk1, _}], []}} = + machi_cr_client:read_chunk(C1, File1, Off1, Size1, []), {ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0, private), %% Verify that the client's CR wrote to all of them. - [{ok, [{_, Off1, Chunk1, _}]} = + [{ok, {[{_, Off1, Chunk1, _}], []}} = machi_flu1_client:read_chunk( - Host, PortBase+X, EpochID, File1, Off1, Size1) || + Host, PortBase+X, EpochID, File1, Off1, Size1, []) || X <- [0,1,2] ], %% Test read repair: Manually write to head, then verify that @@ -128,13 +129,19 @@ smoke_test2() -> FooOff1 = Off1 + (1024*1024), [{error, not_written} = machi_flu1_client:read_chunk( Host, PortBase+X, EpochID, - File1, FooOff1, Size1) || X <- [0,1,2] ], + File1, FooOff1, Size1, []) || X <- [0,1,2] ], ok = machi_flu1_client:write_chunk(Host, PortBase+0, EpochID, File1, FooOff1, Chunk1), - {ok, [{_, FooOff1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, FooOff1, Size1), - [{X,{ok, [{_, FooOff1, Chunk1, _}]}} = {X,machi_flu1_client:read_chunk( + {ok, {[{_, FooOff1, Chunk1, _}], []}} = + machi_flu1_client:read_chunk(Host, PortBase+0, EpochID, + File1, FooOff1, Size1, []), + {ok, {[{_, FooOff1, Chunk1, _}], []}} = + machi_cr_client:read_chunk(C1, File1, FooOff1, Size1, []), + [?assertMatch({X,{ok, {[{_, FooOff1, Chunk1, _}], []}}}, + {X,machi_flu1_client:read_chunk( Host, PortBase+X, EpochID, - File1, FooOff1, Size1)} || X <- [0,1,2] ], + File1, FooOff1, Size1, [])}) + || X <- [0,1,2] ], %% Test read repair: Manually write to middle, then same checking. FooOff2 = Off1 + (2*1024*1024), @@ -142,18 +149,19 @@ smoke_test2() -> Size2 = size(Chunk2), ok = machi_flu1_client:write_chunk(Host, PortBase+1, EpochID, File1, FooOff2, Chunk2), - {ok, [{_, FooOff2, Chunk2, _}]} = machi_cr_client:read_chunk(C1, File1, FooOff2, Size2), - [{X,{ok, [{_, FooOff2, Chunk2, _}]}} = {X,machi_flu1_client:read_chunk( - Host, PortBase+X, EpochID, - File1, FooOff2, Size2)} || X <- [0,1,2] ], + {ok, {[{_, FooOff2, Chunk2, _}], []}} = + machi_cr_client:read_chunk(C1, File1, FooOff2, Size2, []), + [{X,{ok, {[{_, FooOff2, Chunk2, _}], []}}} = + {X,machi_flu1_client:read_chunk( + Host, PortBase+X, EpochID, + File1, FooOff2, Size2, [])} || X <- [0,1,2] ], %% Misc API smoke & minor regression checks {error, bad_arg} = machi_cr_client:read_chunk(C1, <<"no">>, - 999999999, 1), - {ok, [{_,Off1,Chunk1,_}, - {_,FooOff1,Chunk1,_}, - {_,FooOff2,Chunk2,_}]} = machi_cr_client:read_chunk(C1, File1, - Off1, 88888888), + 999999999, 1, []), + {ok, {[{_,Off1,Chunk1,_}, {_,FooOff1,Chunk1,_}, {_,FooOff2,Chunk2,_}], + []}} = + machi_cr_client:read_chunk(C1, File1, Off1, 88888888, []), %% Checksum list return value is a primitive binary(). {ok, KludgeBin} = machi_cr_client:checksum_list(C1, File1), true = is_binary(KludgeBin), @@ -169,8 +177,8 @@ smoke_test2() -> {ok, {Off10,Size10,File10}} = machi_cr_client:append_chunk_extra(C1, Prefix, Chunk10, Extra10 * Size10), - {ok, [{_, Off10, Chunk10, _}]} = - machi_cr_client:read_chunk(C1, File10, Off10, Size10), + {ok, {[{_, Off10, Chunk10, _}], []}} = + machi_cr_client:read_chunk(C1, File10, Off10, Size10, []), [begin Offx = Off10 + (Seq * Size10), %% TODO: uncomment written/not_written enforcement is available. @@ -178,8 +186,8 @@ smoke_test2() -> %% Offx, Size10), {ok, {Offx,Size10,File10}} = machi_cr_client:write_chunk(C1, File10, Offx, Chunk10), - {ok, [{_, Offx, Chunk10, _}]} = - machi_cr_client:read_chunk(C1, File10, Offx, Size10) + {ok, {[{_, Offx, Chunk10, _}], []}} = + machi_cr_client:read_chunk(C1, File10, Offx, Size10, []) end || Seq <- lists:seq(1, Extra10)], {ok, {Off11,Size11,File11}} = machi_cr_client:append_chunk(C1, Prefix, Chunk10), @@ -222,7 +230,8 @@ witness_smoke_test2() -> Chunk1_badcs = {<>, Chunk1}, {error, bad_checksum} = machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs), - {ok, [{_, Off1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, Off1, Size1), + {ok, {[{_, Off1, Chunk1, _}], []}} = + machi_cr_client:read_chunk(C1, File1, Off1, Size1, []), %% Stop 'b' and let the chain reset. ok = machi_flu_psup:stop_flu_package(b), @@ -248,8 +257,8 @@ witness_smoke_test2() -> end, %% Chunk1 is still readable: not affected by wedged witness head. - {ok, [{_, Off1, Chunk1, _}]} = - machi_cr_client:read_chunk(C1, File1, Off1, Size1), + {ok, {[{_, Off1, Chunk1, _}], []}} = + machi_cr_client:read_chunk(C1, File1, Off1, Size1, []), %% But because the head is wedged, an append will fail. {error, partition} = machi_cr_client:append_chunk(C1, Prefix, Chunk1, 1*1000), diff --git a/test/machi_file_proxy_eqc.erl b/test/machi_file_proxy_eqc.erl index ab824e2..78dbd9b 100644 --- a/test/machi_file_proxy_eqc.erl +++ b/test/machi_file_proxy_eqc.erl @@ -202,8 +202,9 @@ read_next(S, _Res, _Args) -> S. read(Pid, Offset, Length) -> case machi_file_proxy:read(Pid, Offset, Length) of - {ok, Chunks} -> - [{_, Offset, Data, Csum}] = machi_cr_client:trim_both_side(Chunks, Offset, Offset+Length), + {ok, {Chunks, _}} -> + [{_, Offset, Data, Csum}] = + machi_cr_client:trim_both_side(Chunks, Offset, Offset+Length), {ok, Data, Csum}; E -> E diff --git a/test/machi_file_proxy_test.erl b/test/machi_file_proxy_test.erl index 017effc..8269483 100644 --- a/test/machi_file_proxy_test.erl +++ b/test/machi_file_proxy_test.erl @@ -80,22 +80,22 @@ machi_file_proxy_test_() -> clean_up_data_dir(?TESTDIR), {ok, Pid} = machi_file_proxy:start_link("test", ?TESTDIR), [ - ?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)), - ?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)), - ?_assertEqual({error, bad_arg}, machi_file_proxy:append(Pid, [], -1, <<"krep">>)), - ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1)), - ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, 1)), - ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1024)), - ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)), - ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)), - ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE))), - ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))), - ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)), - ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)), - ?_assertMatch({ok, [{_, _, _, _}]}, machi_file_proxy:read(Pid, 1025, 1000)), - ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)), - ?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))), - ?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid)) + ?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)), + ?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)), + ?_assertEqual({error, bad_arg}, machi_file_proxy:append(Pid, [], -1, <<"krep">>)), + ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1)), + ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, 1)), + ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1024)), + ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)), + ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)), + ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE))), + ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))), + ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)), + ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)), + ?_assertMatch({ok, {[{_, _, _, _}], []}}, machi_file_proxy:read(Pid, 1025, 1000)), + ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)), + ?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))), + ?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid)) ]. multiple_chunks_read_test_() -> @@ -108,11 +108,11 @@ multiple_chunks_read_test_() -> ?_assertEqual(ok, machi_file_proxy:write(Pid, 30000, <<"fail">>)), %% Freeza ?_assertEqual(ok, machi_file_proxy:write(Pid, 530000, <<"fail">>)), - ?_assertMatch({ok, [{"test", 1024, _, _}, - {"test", 10000, <<"fail">>, _}, - {"test", 20000, <<"fail">>, _}, - {"test", 30000, <<"fail">>, _}, - {"test", 530000, <<"fail">>, _}]}, + ?_assertMatch({ok, {[{"test", 1024, _, _}, + {"test", 10000, <<"fail">>, _}, + {"test", 20000, <<"fail">>, _}, + {"test", 30000, <<"fail">>, _}, + {"test", 530000, <<"fail">>, _}], []}}, machi_file_proxy:read(Pid, 1024, 530000)), ?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid)) ]. diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index ac3ba33..b8dded9 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -97,8 +97,8 @@ flu_smoke_test() -> {ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, Prefix, Chunk1), - {ok, [{_, Off1, Chunk1, _}]} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, - File1, Off1, Len1), + {ok, {[{_, Off1, Chunk1, _}], _}} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, + File1, Off1, Len1, []), {ok, KludgeBin} = ?FLU_C:checksum_list(Host, TcpPort, ?DUMMY_PV1_EPOCH, File1), true = is_binary(KludgeBin), @@ -109,7 +109,7 @@ flu_smoke_test() -> Len1 = size(Chunk1), {error, not_written} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, - File1, Off1*983829323, Len1), + File1, Off1*983829323, Len1, []), %% XXX FIXME %% %% This is failing because the read extends past the end of the file. @@ -151,14 +151,14 @@ flu_smoke_test() -> File2, Off2, Chunk2), {error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, BadFile, Off2, Chunk2), - {ok, [{_, Off2, Chunk2, _}]} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, - File2, Off2, Len2), + {ok, {[{_, Off2, Chunk2, _}], _}} = + ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, File2, Off2, Len2, []), {error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, - "no!!", Off2, Len2), + "no!!", Off2, Len2, []), {error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, - BadFile, Off2, Len2), + BadFile, Off2, Len2, []), %% We know that File1 still exists. Pretend that we've done a %% migration and exercise the delete_migration() API. @@ -261,7 +261,7 @@ witness_test() -> Prefix, Chunk1), File = <<"foofile">>, {error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort, EpochID1, - File, 9999, 9999), + File, 9999, 9999, []), {error, bad_arg} = ?FLU_C:checksum_list(Host, TcpPort, EpochID1, File), {error, bad_arg} = ?FLU_C:list_files(Host, TcpPort, EpochID1), diff --git a/test/machi_flu_psup_test.erl b/test/machi_flu_psup_test.erl index 8a45963..1c7b015 100644 --- a/test/machi_flu_psup_test.erl +++ b/test/machi_flu_psup_test.erl @@ -146,7 +146,7 @@ partial_stop_restart2() -> {_, #p_srvr{address=Addr_a, port=TcpPort_a}} = hd(Ps), {error, wedged} = machi_flu1_client:read_chunk( Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH, - <<>>, 99999999, 1), + <<>>, 99999999, 1, []), {error, wedged} = machi_flu1_client:checksum_list( Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH, <<>>), %% list_files() is permitted despite wedged status diff --git a/test/machi_pb_high_client_test.erl b/test/machi_pb_high_client_test.erl index 1f5a1bf..25c79fd 100644 --- a/test/machi_pb_high_client_test.erl +++ b/test/machi_pb_high_client_test.erl @@ -81,8 +81,8 @@ smoke_test2() -> {iolist_to_binary(Chunk3), File3, Off3, Size3}], [begin File = binary_to_list(Fl), - ?assertMatch({ok, [{File, Off, Ch, _}]}, - ?C:read_chunk(Clnt, Fl, Off, Sz)) + ?assertMatch({ok, {[{File, Off, Ch, _}], []}}, + ?C:read_chunk(Clnt, Fl, Off, Sz, [])) end || {Ch, Fl, Off, Sz} <- Reads], {ok, KludgeBin} = ?C:checksum_list(Clnt, File1), diff --git a/test/machi_proxy_flu1_client_test.erl b/test/machi_proxy_flu1_client_test.erl index 63fab07..3adfad5 100644 --- a/test/machi_proxy_flu1_client_test.erl +++ b/test/machi_proxy_flu1_client_test.erl @@ -60,14 +60,14 @@ api_smoke_test() -> {ok, {MyOff,MySize,MyFile}} = ?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk, infinity), - {ok, [{_, MyOff, MyChunk, _}]} = - ?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize), + {ok, {[{_, MyOff, MyChunk, _}], []}} = + ?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize, []), MyChunk2 = <<"my chunk data, yeah, again">>, {ok, {MyOff2,MySize2,MyFile2}} = ?MUT:append_chunk_extra(Prox1, FakeEpoch, Prefix, MyChunk2, 4242, infinity), - {ok, [{_, MyOff2, MyChunk2, _}]} = - ?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2), + {ok, {[{_, MyOff2, MyChunk2, _}], []}} = + ?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2, []), MyChunk_badcs = {<>, MyChunk}, {error, bad_checksum} = ?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk_badcs), @@ -245,13 +245,13 @@ flu_restart_test2() -> (stop) -> ?MUT:append_chunk_extra(Prox1, FakeEpoch, <<"prefix">>, Data, 42, infinity) end, - fun(run) -> {ok, [{_, Off1, Data, _}]} = + fun(run) -> {ok, {[{_, Off1, Data, _}], []}} = ?MUT:read_chunk(Prox1, FakeEpoch, - File1, Off1, Size1), + File1, Off1, Size1, []), ok; (line) -> io:format("line ~p, ", [?LINE]); (stop) -> ?MUT:read_chunk(Prox1, FakeEpoch, - File1, Off1, Size1) + File1, Off1, Size1, []) end, fun(run) -> {ok, KludgeBin} = ?MUT:checksum_list(Prox1, FakeEpoch, File1), diff --git a/tools.mk b/tools.mk deleted file mode 100644 index 1c40f8e..0000000 --- a/tools.mk +++ /dev/null @@ -1,167 +0,0 @@ -# ------------------------------------------------------------------- -# -# Copyright (c) 2014 Basho Technologies, Inc. -# -# This file is provided to you under the Apache License, -# Version 2.0 (the "License"); you may not use this file -# except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# ------------------------------------------------------------------- - -# ------------------------------------------------------------------- -# NOTE: This file is is from https://github.com/basho/tools.mk. -# It should not be edited in a project. It should simply be updated -# wholesale when a new version of tools.mk is released. -# ------------------------------------------------------------------- - -REBAR ?= ./rebar -REVISION ?= $(shell git rev-parse --short HEAD) -PROJECT ?= $(shell basename `find src -name "*.app.src"` .app.src) - -.PHONY: compile-no-deps test docs xref dialyzer-run dialyzer-quick dialyzer \ - cleanplt upload-docs - -compile-no-deps: - ${REBAR} compile skip_deps=true - -test: compile - ${REBAR} eunit skip_deps=true - -upload-docs: docs - @if [ -z "${BUCKET}" -o -z "${PROJECT}" -o -z "${REVISION}" ]; then \ - echo "Set BUCKET, PROJECT, and REVISION env vars to upload docs"; \ - exit 1; fi - @cd doc; s3cmd put -P * "s3://${BUCKET}/${PROJECT}/${REVISION}/" > /dev/null - @echo "Docs built at: http://${BUCKET}.s3-website-us-east-1.amazonaws.com/${PROJECT}/${REVISION}" - -docs: - ${REBAR} doc skip_deps=true - -xref: compile - ${REBAR} xref skip_deps=true - -PLT ?= $(HOME)/.combo_dialyzer_plt -LOCAL_PLT = .local_dialyzer_plt -DIALYZER_FLAGS ?= -Wunmatched_returns -NATIVE_EBIN ?= ./.ebin.native -DIALYZER_BIN ?= dialyzer -# Always include -pa arg in DIALYZER_CMD for speed -DIALYZER_CMD ?= $(DIALYZER_BIN) -pa $(NATIVE_EBIN) -DIALYZER_VERSION = $(shell $(DIALYZER_BIN) --version | sed 's/.* //') -ERL_LIB_DIR = $(shell erl -eval '{io:format("~s\n", [code:lib_dir()]), erlang:halt(0)}.' | tail -1) - -native-ebin: - mkdir -p $(NATIVE_EBIN) - rm -f $(NATIVE_EBIN)/*.erl $(NATIVE_EBIN)/*.hrl $(NATIVE_EBIN)/*.beam - cp $(ERL_LIB_DIR)/stdlib-*/src/{lists,dict,digraph,digraph_utils,ets,gb_sets,gb_trees,ordsets,sets,sofs}.erl $(NATIVE_EBIN) - cp $(ERL_LIB_DIR)/compiler-*/src/{cerl,cerl_trees,core_parse}.?rl $(NATIVE_EBIN) - cp $(ERL_LIB_DIR)/dialyzer-*/src/{dialyzer_analysis_callgraph,dialyzer,dialyzer_behaviours,dialyzer_codeserver,dialyzer_contracts,dialyzer_coordinator,dialyzer_dataflow,dialyzer_dep,dialyzer_plt,dialyzer_succ_typings,dialyzer_typesig,dialyzer_worker}.?rl $(NATIVE_EBIN) - cp $(ERL_LIB_DIR)/hipe-*/*/{erl_types,erl_bif_types}.?rl $(NATIVE_EBIN) - erlc -o $(NATIVE_EBIN) -smp +native -DVSN='"$(DIALYZER_VERSION)"' $(NATIVE_EBIN)/*erl - -${PLT}: compile - @mkdir -p $(NATIVE_EBIN) - @if [ -f $(PLT) ]; then \ - $(DIALYZER_CMD) --check_plt --plt $(PLT) --apps $(DIALYZER_APPS) && \ - $(DIALYZER_CMD) --add_to_plt --plt $(PLT) --output_plt $(PLT) --apps $(DIALYZER_APPS) ; test $$? -ne 1; \ - else \ - $(DIALYZER_CMD) --build_plt --output_plt $(PLT) --apps $(DIALYZER_APPS); test $$? -ne 1; \ - fi - -${LOCAL_PLT}: compile - @mkdir -p $(NATIVE_EBIN) - @if [ -d deps ]; then \ - if [ -f $(LOCAL_PLT) ]; then \ - $(DIALYZER_CMD) --check_plt --plt $(LOCAL_PLT) deps/*/ebin && \ - $(DIALYZER_CMD) --add_to_plt --plt $(LOCAL_PLT) --output_plt $(LOCAL_PLT) deps/*/ebin ; test $$? -ne 1; \ - else \ - $(DIALYZER_CMD) --build_plt --output_plt $(LOCAL_PLT) deps/*/ebin ; test $$? -ne 1; \ - fi \ - fi - -dialyzer-run: - @mkdir -p $(NATIVE_EBIN) - @echo "==> $(shell basename $(shell pwd)) (dialyzer)" -# The bulk of the code below deals with the dialyzer.ignore-warnings file -# which contains strings to ignore if output by dialyzer. -# Typically the strings include line numbers. Using them exactly is hard -# to maintain as the code changes. This approach instead ignores the line -# numbers, but takes into account the number of times a string is listed -# for a given file. So if one string is listed once, for example, and it -# appears twice in the warnings, the user is alerted. It is possible but -# unlikely that this approach could mask a warning if one ignored warning -# is removed and two warnings of the same kind appear in the file, for -# example. But it is a trade-off that seems worth it. -# Details of the cryptic commands: -# - Remove line numbers from dialyzer.ignore-warnings -# - Pre-pend duplicate count to each warning with sort | uniq -c -# - Remove annoying white space around duplicate count -# - Save in dialyer.ignore-warnings.tmp -# - Do the same to dialyzer_warnings -# - Remove matches from dialyzer.ignore-warnings.tmp from output -# - Remove duplicate count -# - Escape regex special chars to use lines as regex patterns -# - Add pattern to match any line number (file.erl:\d+:) -# - Anchor to match the entire line (^entire line$) -# - Save in dialyzer_unhandled_warnings -# - Output matches for those patterns found in the original warnings - @if [ -f $(LOCAL_PLT) ]; then \ - PLTS="$(PLT) $(LOCAL_PLT)"; \ - else \ - PLTS=$(PLT); \ - fi; \ - if [ -f dialyzer.ignore-warnings ]; then \ - if [ $$(grep -cvE '[^[:space:]]' dialyzer.ignore-warnings) -ne 0 ]; then \ - echo "ERROR: dialyzer.ignore-warnings contains a blank/empty line, this will match all messages!"; \ - exit 1; \ - fi; \ - $(DIALYZER_CMD) $(DIALYZER_FLAGS) --plts $${PLTS} -c ebin > dialyzer_warnings ; \ - cat dialyzer.ignore-warnings \ - | sed -E 's/^([^:]+:)[^:]+:/\1/' \ - | sort \ - | uniq -c \ - | sed -E '/.*\.erl: /!s/^[[:space:]]*[0-9]+[[:space:]]*//' \ - > dialyzer.ignore-warnings.tmp ; \ - egrep -v "^[[:space:]]*(done|Checking|Proceeding|Compiling)" dialyzer_warnings \ - | sed -E 's/^([^:]+:)[^:]+:/\1/' \ - | sort \ - | uniq -c \ - | sed -E '/.*\.erl: /!s/^[[:space:]]*[0-9]+[[:space:]]*//' \ - | grep -F -f dialyzer.ignore-warnings.tmp -v \ - | sed -E 's/^[[:space:]]*[0-9]+[[:space:]]*//' \ - | sed -E 's/([]\^:+?|()*.$${}\[])/\\\1/g' \ - | sed -E 's/(\\\.erl\\\:)/\1[[:digit:]]+:/g' \ - | sed -E 's/^(.*)$$/^[[:space:]]*\1$$/g' \ - > dialyzer_unhandled_warnings ; \ - rm dialyzer.ignore-warnings.tmp; \ - if [ $$(cat dialyzer_unhandled_warnings | egrep -v 'Unknown functions\\:' | wc -l) -gt 0 ]; then \ - egrep -f dialyzer_unhandled_warnings dialyzer_warnings ; \ - found_warnings=1; \ - fi; \ - [ "$$found_warnings" != 1 ] ; \ - else \ - $(DIALYZER_CMD) $(DIALYZER_FLAGS) --plts $${PLTS} -c ebin; \ - fi - -dialyzer-quick: compile-no-deps dialyzer-run - -dialyzer: ${PLT} ${LOCAL_PLT} dialyzer-run - -cleanplt: - @echo - @echo "Are you sure? It takes several minutes to re-build." - @echo Deleting $(PLT) and $(LOCAL_PLT) in 5 seconds. - @echo - sleep 5 - rm $(PLT) - rm $(LOCAL_PLT)