diff --git a/src/machi_chain_repair.erl b/src/machi_chain_repair.erl index cb34da1..146fe65 100644 --- a/src/machi_chain_repair.erl +++ b/src/machi_chain_repair.erl @@ -105,6 +105,8 @@ repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) -> RepairMode = proplists:get_value(repair_mode, Opts, repair), Verb = proplists:get_value(verbose, Opts, false), RepairId = proplists:get_value(repair_id, Opts, id1), +erlang:display(wtf), + %% io:format(user, "TODO: ~p\n", [{error, {What, Why, Stack}}]), Res = try _ = [begin {ok, Proxy} = machi_proxy_flu1_client:start_link(P), @@ -127,6 +129,7 @@ repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) -> {ok, EpochID} = machi_proxy_flu1_client:get_epoch_id( SrcProxy, ?SHORT_TIMEOUT), %% ?VERB("Make repair directives: "), +erlang:display(yo1), Ds = [{File, make_repair_directives( ConsistencyMode, RepairMode, File, Size, EpochID, @@ -146,16 +149,21 @@ repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) -> end || FLU <- OurFLUs], %% ?VERB("Execute repair directives: "), +erlang:display(yo1), ok = execute_repair_directives(ConsistencyMode, Ds, Src, EpochID, Verb, OurFLUs, ProxiesDict, ETS), +erlang:display(yo2), %% ?VERB(" done\n"), lager:info("Repair ~w repair directives finished\n", [RepairId]), ok catch What:Why -> +io:format(user, "yo3 ~p ~p\n", [What,Why]), Stack = erlang:get_stacktrace(), +io:format(user, "yo3 ~p\n", [Stack]), {error, {What, Why, Stack}} after +erlang:display(yo4), [(catch machi_proxy_flu1_client:quit(Pid)) || Pid <- orddict:to_list(get(proxies_dict))] end, @@ -236,7 +244,7 @@ make_repair_directives(ConsistencyMode, RepairMode, File, Size, _EpochID, make_repair_directives2(C2, ConsistencyMode, RepairMode, File, Verb, Src, FLUs, ProxiesDict, ETS) -> - ?VERB("."), + ?VERB(".1"), make_repair_directives3(C2, ConsistencyMode, RepairMode, File, Verb, Src, FLUs, ProxiesDict, ETS, []). @@ -327,17 +335,17 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) -> F = fun({copy, {Offset, Size, TaggedCSum, MySrc}, MyDsts}, Acc2) -> SrcP = orddict:fetch(MySrc, ProxiesDict), case ets:lookup_element(ETS, in_chunks, 2) rem 100 of - 0 -> ?VERB(".", []); + 0 -> ?VERB(".2", []); _ -> ok end, _T1 = os:timestamp(), %% TODO: support case multiple written or trimmed chunks returned NSInfo = undefined, - io:format(user, "TODO fix broken read_chunk mod ~s line ~w\n", [?MODULE, ?LINE]), - {ok, {[{_, Offset, Chunk, _}], _}} = + {ok, {[{_, Offset, Chunk, _ReadCSum}|OtherChunks], []=_TrimmedList}} = machi_proxy_flu1_client:read_chunk( SrcP, NSInfo, EpochID, File, Offset, Size, undefined, ?SHORT_TIMEOUT), + [] = OtherChunks, _T2 = os:timestamp(), <<_Tag:1/binary, CSum/binary>> = TaggedCSum, case machi_util:checksum_chunk(Chunk) of @@ -346,7 +354,7 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) -> DstP = orddict:fetch(DstFLU, ProxiesDict), _T3 = os:timestamp(), ok = machi_proxy_flu1_client:write_chunk( - DstP, NSInfo, EpochID, File, Offset, Chunk, + DstP, NSInfo, EpochID, File, Offset, Chunk, TaggedCSum, ?SHORT_TIMEOUT), _T4 = os:timestamp() end || DstFLU <- MyDsts], @@ -371,7 +379,9 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) -> Acc2 end end, +erlang:display({yo,?LINE}), ok = lists:foldl(F, ok, Cmds), +erlang:display({yo,?LINE}), %% Copy this file's stats to the total counts. _ = [ets:update_counter(ETS, T_K, ets:lookup_element(ETS, L_K, 2)) || {L_K, T_K} <- EtsKeys], diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index e77f9e2..cc4a508 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -63,7 +63,7 @@ %% File API append_chunk/5, append_chunk/6, append_chunk/7, - write_chunk/5, write_chunk/6, + write_chunk/6, write_chunk/7, read_chunk/6, read_chunk/7, trim_chunk/5, trim_chunk/6, checksum_list/2, checksum_list/3, @@ -129,14 +129,14 @@ append_chunk(PidSpec, NSInfo, Prefix, Chunk, CSum, #append_opts{}=Opts, Timeout0 %% allocated/sequenced by an earlier append_chunk() call) to %% `File' at `Offset'. -write_chunk(PidSpec, NSInfo, File, Offset, Chunk) -> - write_chunk(PidSpec, NSInfo, File, Offset, Chunk, ?DEFAULT_TIMEOUT). +write_chunk(PidSpec, NSInfo, File, Offset, Chunk, CSum) -> + write_chunk(PidSpec, NSInfo, File, Offset, Chunk, CSum, ?DEFAULT_TIMEOUT). %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. -write_chunk(PidSpec, NSInfo, File, Offset, Chunk, Timeout0) -> +write_chunk(PidSpec, NSInfo, File, Offset, Chunk, CSum, Timeout0) -> {TO, Timeout} = timeout(Timeout0), - gen_server:call(PidSpec, {req, {write_chunk, NSInfo, File, Offset, Chunk, TO}}, + gen_server:call(PidSpec, {req, {write_chunk, NSInfo, File, Offset, Chunk, CSum, TO}}, Timeout). %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. @@ -229,8 +229,8 @@ handle_call2({append_chunk, NSInfo, Prefix, Chunk, CSum, Opts, TO}, _From, S) -> do_append_head(NSInfo, Prefix, Chunk, CSum, Opts, 0, os:timestamp(), TO, S); -handle_call2({write_chunk, NSInfo, File, Offset, Chunk, TO}, _From, S) -> - do_write_head(NSInfo, File, Offset, Chunk, 0, os:timestamp(), TO, S); +handle_call2({write_chunk, NSInfo, File, Offset, Chunk, CSum, TO}, _From, S) -> + do_write_head(NSInfo, File, Offset, Chunk, CSum, 0, os:timestamp(), TO, S); handle_call2({read_chunk, NSInfo, File, Offset, Size, Opts, TO}, _From, S) -> do_read_chunk(NSInfo, File, Offset, Size, Opts, 0, os:timestamp(), TO, S); handle_call2({trim_chunk, NSInfo, File, Offset, Size, TO}, _From, S) -> @@ -246,7 +246,6 @@ do_append_head(NSInfo, Prefix, Chunk, CSum, Opts, Depth + 1, STime, TO, S); do_append_head(NSInfo, Prefix, Chunk, CSum, Opts, Depth, STime, TO, #state{proj=P}=S) -> - %% io:format(user, "head sleep1,", []), sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > TO -> @@ -300,9 +299,9 @@ do_append_head3(NSInfo, Prefix, case ?FLU_PC:append_chunk(Proxy, NSInfo, EpochID, Prefix, Chunk, CSum, Opts, ?TIMEOUT) of {ok, {Offset, _Size, File}=_X} -> - do_append_midtail(RestFLUs, NSInfo, Prefix, + do_wr_app_midtail(RestFLUs, NSInfo, Prefix, File, Offset, Chunk, CSum, Opts, - [HeadFLU], 0, STime, TO, S); + [HeadFLU], 0, STime, TO, append, S); {error, bad_checksum}=BadCS -> {reply, BadCS, S}; {error, Retry} @@ -323,17 +322,16 @@ do_append_head3(NSInfo, Prefix, Prefix,iolist_size(Chunk)}) end. -do_append_midtail(RestFLUs, NSInfo, Prefix, +do_wr_app_midtail(RestFLUs, NSInfo, Prefix, File, Offset, Chunk, CSum, Opts, - Ws, Depth, STime, TO, S) + Ws, Depth, STime, TO, MyOp, S) when RestFLUs == [] orelse Depth == 0 -> - do_append_midtail2(RestFLUs, NSInfo, Prefix, + do_wr_app_midtail2(RestFLUs, NSInfo, Prefix, File, Offset, Chunk, CSum, Opts, - Ws, Depth + 1, STime, TO, S); -do_append_midtail(_RestFLUs, NSInfo, Prefix, File, + Ws, Depth + 1, STime, TO, MyOp, S); +do_wr_app_midtail(_RestFLUs, NSInfo, Prefix, File, Offset, Chunk, CSum, Opts, - Ws, Depth, STime, TO, #state{proj=P}=S) -> - %% io:format(user, "midtail sleep2,", []), + Ws, Depth, STime, TO, MyOp, #state{proj=P}=S) -> sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > TO -> @@ -347,60 +345,66 @@ do_append_midtail(_RestFLUs, NSInfo, Prefix, File, RestFLUs2 = mutation_flus(P2), case RestFLUs2 -- Ws of RestFLUs2 -> - %% None of the writes that we have done so far - %% are to FLUs that are in the RestFLUs2 list. - %% We are pessimistic here and assume that - %% those FLUs are permanently dead. Start - %% over with a new sequencer assignment, at - %% the 2nd have of the impl (we have already - %% slept & refreshed the projection). - if Prefix == undefined -> % atom! not binary()!! {error, partition}; - true -> + MyOp == append -> + %% None of the writes that we have done so + %% far are to FLUs that are in the + %% RestFLUs2 list. We are pessimistic + %% here and assume that those FLUs are + %% permanently dead. Start over with a + %% new sequencer assignment, at the 2nd + %% have of the impl (we have already slept + %% & refreshed the projection). do_append_head2(NSInfo, Prefix, Chunk, CSum, Opts, - Depth, STime, TO, S2) + Depth, STime, TO, S2); + MyOp == write -> + do_wr_app_midtail2(RestFLUs2, + NSInfo, + Prefix, File, Offset, + Chunk, CSum, Opts, + Ws, Depth + 1, STime, TO, + MyOp, S2) end; RestFLUs3 -> - do_append_midtail2(RestFLUs3, + do_wr_app_midtail2(RestFLUs3, NSInfo, Prefix, File, Offset, Chunk, CSum, Opts, - Ws, Depth + 1, STime, TO, S2) + Ws, Depth + 1, STime, TO, + MyOp, S2) end end end. -do_append_midtail2([], _NSInfo, +do_wr_app_midtail2([], _NSInfo, _Prefix, File, Offset, Chunk, - _CSum, _Opts, _Ws, _Depth, _STime, _TO, S) -> - %% io:format(user, "ok!\n", []), + _CSum, _Opts, _Ws, _Depth, _STime, _TO, _MyOp, S) -> {reply, {ok, {Offset, chunk_wrapper_size(Chunk), File}}, S}; -do_append_midtail2([FLU|RestFLUs]=FLUs, NSInfo, +do_wr_app_midtail2([FLU|RestFLUs]=FLUs, NSInfo, Prefix, File, Offset, Chunk, - CSum, Opts, Ws, Depth, STime, TO, + CSum, Opts, Ws, Depth, STime, TO, MyOp, #state{epoch_id=EpochID, proxies_dict=PD}=S) -> Proxy = orddict:fetch(FLU, PD), - case ?FLU_PC:write_chunk(Proxy, NSInfo, EpochID, File, Offset, Chunk, ?TIMEOUT) of + case ?FLU_PC:write_chunk(Proxy, NSInfo, EpochID, File, Offset, Chunk, CSum, ?TIMEOUT) of ok -> - %% io:format(user, "write ~w,", [FLU]), - do_append_midtail2(RestFLUs, NSInfo, Prefix, + do_wr_app_midtail2(RestFLUs, NSInfo, Prefix, File, Offset, Chunk, - CSum, Opts, [FLU|Ws], Depth, STime, TO, S); + CSum, Opts, [FLU|Ws], Depth, STime, TO, MyOp, S); {error, bad_checksum}=BadCS -> %% TODO: alternate strategy? {reply, BadCS, S}; {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> - do_append_midtail(FLUs, NSInfo, Prefix, + do_wr_app_midtail(FLUs, NSInfo, Prefix, File, Offset, Chunk, - CSum, Opts, Ws, Depth, STime, TO, S); + CSum, Opts, Ws, Depth, STime, TO, MyOp, S); {error, written} -> %% We know what the chunk ought to be, so jump to the %% middle of read-repair. Resume = {append, Offset, iolist_size(Chunk), File}, - do_repair_chunk(FLUs, Resume, Chunk, [], NSInfo, File, Offset, + do_repair_chunk(FLUs, Resume, Chunk, CSum, [], NSInfo, File, Offset, iolist_size(Chunk), Depth, STime, S); {error, trimmed} = Err -> %% TODO: nothing can be done @@ -426,10 +430,9 @@ witnesses_use_our_epoch([FLU|RestFLUs], false end. -do_write_head(NSInfo, File, Offset, Chunk, 0=Depth, STime, TO, S) -> - do_write_head2(NSInfo, File, Offset, Chunk, Depth + 1, STime, TO, S); -do_write_head(NSInfo, File, Offset, Chunk, Depth, STime, TO, #state{proj=P}=S) -> - %% io:format(user, "head sleep1,", []), +do_write_head(NSInfo, File, Offset, Chunk, CSum, 0=Depth, STime, TO, S) -> + do_write_head2(NSInfo, File, Offset, Chunk, CSum, Depth + 1, STime, TO, S); +do_write_head(NSInfo, File, Offset, Chunk, CSum, Depth, STime, TO, #state{proj=P}=S) -> sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > TO -> @@ -443,31 +446,32 @@ do_write_head(NSInfo, File, Offset, Chunk, Depth, STime, TO, #state{proj=P}=S) - case S2#state.proj of P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> - do_write_head(NSInfo, File, Offset, Chunk, Depth + 1, + do_write_head(NSInfo, File, Offset, Chunk, CSum, Depth + 1, STime, TO, S2); _ -> - do_write_head2(NSInfo, File, Offset, Chunk, Depth + 1, + do_write_head2(NSInfo, File, Offset, Chunk, CSum, Depth + 1, STime, TO, S2) end end. -do_write_head2(NSInfo, File, Offset, Chunk, Depth, STime, TO, +do_write_head2(NSInfo, File, Offset, Chunk, CSum, Depth, STime, TO, #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> [HeadFLU|RestFLUs] = mutation_flus(P), Proxy = orddict:fetch(HeadFLU, PD), - case ?FLU_PC:write_chunk(Proxy, NSInfo, EpochID, File, Offset, Chunk, ?TIMEOUT) of + case ?FLU_PC:write_chunk(Proxy, NSInfo, EpochID, File, Offset, Chunk, CSum, ?TIMEOUT) of ok -> %% From this point onward, we use the same code & logic path as %% append does. -Prefix=todo_prefix,CSum=todo_csum,Opts=todo_opts, - do_append_midtail(RestFLUs, NSInfo, Prefix, + Prefix=unused_write_path, + Opts=unused_write_path, + do_wr_app_midtail(RestFLUs, NSInfo, Prefix, File, Offset, Chunk, - CSum, Opts, [HeadFLU], 0, STime, TO, S); + CSum, Opts, [HeadFLU], 0, STime, TO, write, S); {error, bad_checksum}=BadCS -> {reply, BadCS, S}; {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> - do_write_head(NSInfo, File, Offset, Chunk, Depth, STime, TO, S); + do_write_head(NSInfo, File, Offset, Chunk, CSum, Depth, STime, TO, S); {error, written}=Err -> {reply, Err, S}; {error, trimmed}=Err -> @@ -588,7 +592,6 @@ do_trim_midtail(RestFLUs, Prefix, NSInfo, File, Offset, Size, Ws, Depth + 1, STime, TO, S); do_trim_midtail(_RestFLUs, Prefix, NSInfo, File, Offset, Size, Ws, Depth, STime, TO, #state{proj=P}=S) -> - %% io:format(user, "midtail sleep2,", []), sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > TO -> @@ -625,7 +628,6 @@ do_trim_midtail(_RestFLUs, Prefix, NSInfo, File, Offset, Size, do_trim_midtail2([], _Prefix, _NSInfo, _File, _Offset, _Size, _Ws, _Depth, _STime, _TO, S) -> - %% io:format(user, "ok!\n", []), {reply, ok, S}; do_trim_midtail2([FLU|RestFLUs]=FLUs, Prefix, NSInfo, File, Offset, Size, Ws, Depth, STime, TO, @@ -633,7 +635,6 @@ do_trim_midtail2([FLU|RestFLUs]=FLUs, Prefix, NSInfo, File, Offset, Size, Proxy = orddict:fetch(FLU, PD), case ?FLU_PC:trim_chunk(Proxy, NSInfo, EpochID, File, Offset, Size, ?TIMEOUT) of ok -> - %% io:format(user, "write ~w,", [FLU]), do_trim_midtail2(RestFLUs, Prefix, NSInfo, File, Offset, Size, [FLU|Ws], Depth, STime, TO, S); {error, trimmed} -> @@ -748,10 +749,11 @@ read_repair2(ap_mode=ConsistencyMode, do_repair_chunks([], _, _, _, _, _, _, _, S, Reply) -> {Reply, S}; -do_repair_chunks([{_, Offset, Chunk, _Csum}|T], +do_repair_chunks([{_, Offset, Chunk, CSum}|T], ToRepair, ReturnMode, [GotItFrom], NSInfo, File, Depth, STime, S, Reply) -> + true = _TODO_fixme = not is_atom(CSum), Size = iolist_size(Chunk), - case do_repair_chunk(ToRepair, ReturnMode, Chunk, [GotItFrom], NSInfo, File, Offset, + case do_repair_chunk(ToRepair, ReturnMode, Chunk, CSum, [GotItFrom], NSInfo, File, Offset, Size, Depth, STime, S) of {ok, Chunk, S1} -> do_repair_chunks(T, ToRepair, ReturnMode, [GotItFrom], NSInfo, File, Depth, STime, S1, Reply); @@ -759,9 +761,8 @@ do_repair_chunks([{_, Offset, Chunk, _Csum}|T], Error end. -do_repair_chunk(ToRepair, ReturnMode, Chunk, Repaired, NSInfo, File, Offset, +do_repair_chunk(ToRepair, ReturnMode, Chunk, CSum, Repaired, NSInfo, File, Offset, Size, Depth, STime, #state{proj=P}=S) -> - %% io:format(user, "read_repair3 sleep1,", []), sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> @@ -771,16 +772,16 @@ do_repair_chunk(ToRepair, ReturnMode, Chunk, Repaired, NSInfo, File, Offset, case S2#state.proj of P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> - do_repair_chunk(ToRepair, ReturnMode, Chunk, Repaired, NSInfo, File, + do_repair_chunk(ToRepair, ReturnMode, Chunk, CSum, Repaired, NSInfo, File, Offset, Size, Depth + 1, STime, S2); P2 -> ToRepair2 = mutation_flus(P2) -- Repaired, - do_repair_chunk2(ToRepair2, ReturnMode, Chunk, Repaired, NSInfo, File, + do_repair_chunk2(ToRepair2, ReturnMode, Chunk, CSum, Repaired, NSInfo, File, Offset, Size, Depth + 1, STime, S2) end end. -do_repair_chunk2([], ReturnMode, Chunk, _Repaired, _NSInfo, File, Offset, +do_repair_chunk2([], ReturnMode, Chunk, _CSum, _Repaired, _NSInfo, File, Offset, _IgnoreSize, _Depth, _STime, S) -> %% TODO: add stats for # of repairs, length(_Repaired)-1, etc etc? case ReturnMode of @@ -789,24 +790,24 @@ do_repair_chunk2([], ReturnMode, Chunk, _Repaired, _NSInfo, File, Offset, {append, Offset, Size, File} -> {ok, {Offset, Size, File}, S} end; -do_repair_chunk2([First|Rest]=ToRepair, ReturnMode, Chunk, Repaired, NSInfo, File, Offset, +do_repair_chunk2([First|Rest]=ToRepair, ReturnMode, Chunk, CSum, Repaired, NSInfo, File, Offset, Size, Depth, STime, #state{epoch_id=EpochID, proxies_dict=PD}=S) -> Proxy = orddict:fetch(First, PD), - case ?FLU_PC:write_chunk(Proxy, NSInfo, EpochID, File, Offset, Chunk, ?TIMEOUT) of + case ?FLU_PC:write_chunk(Proxy, NSInfo, EpochID, File, Offset, Chunk, CSum, ?TIMEOUT) of ok -> - do_repair_chunk2(Rest, ReturnMode, Chunk, [First|Repaired], NSInfo, File, + do_repair_chunk2(Rest, ReturnMode, Chunk, CSum, [First|Repaired], NSInfo, File, Offset, Size, Depth, STime, S); {error, bad_checksum}=BadCS -> %% TODO: alternate strategy? {BadCS, S}; {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> - do_repair_chunk(ToRepair, ReturnMode, Chunk, Repaired, NSInfo, File, + do_repair_chunk(ToRepair, ReturnMode, Chunk, CSum, Repaired, NSInfo, File, Offset, Size, Depth, STime, S); {error, written} -> %% TODO: To be very paranoid, read the chunk here to verify %% that it is exactly our Chunk. - do_repair_chunk2(Rest, ReturnMode, Chunk, Repaired, NSInfo, File, + do_repair_chunk2(Rest, ReturnMode, Chunk, CSum, Repaired, NSInfo, File, Offset, Size, Depth, STime, S); {error, trimmed} = _Error -> %% TODO @@ -926,11 +927,13 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict, update_proj2(Count + 1, S); P when P >= BadProj -> #projection_v1{epoch_number=Epoch, epoch_csum=CSum, - members_dict=NewMembersDict} = P, + members_dict=NewMembersDict, dbg2=Dbg2} = P, EpochID = {Epoch, CSum}, ?FLU_PC:stop_proxies(ProxiesDict), NewProxiesDict = ?FLU_PC:start_proxies(NewMembersDict), - S#state{bad_proj=undefined, proj=P, epoch_id=EpochID, + %% Make crash reports shorter by getting rid of 'react' history. + P2 = P#projection_v1{dbg2=lists:keydelete(react, 1, Dbg2)}, + S#state{bad_proj=undefined, proj=P2, epoch_id=EpochID, members_dict=NewMembersDict, proxies_dict=NewProxiesDict}; _P -> sleep_a_while(Count), diff --git a/src/machi_dt.erl b/src/machi_dt.erl index de34b64..6a57e86 100644 --- a/src/machi_dt.erl +++ b/src/machi_dt.erl @@ -24,11 +24,12 @@ -include("machi_projection.hrl"). -type append_opts() :: #append_opts{}. --type chunk() :: chunk_bin() | {chunk_csum(), chunk_bin()}. --type chunk_bin() :: binary() | iolist(). % client can use either --type chunk_csum() :: binary(). % 1 byte tag, N-1 bytes checksum --type chunk_summary() :: {file_offset(), chunk_size(), binary()}. --type chunk_s() :: 'trimmed' | binary(). +-type chunk() :: chunk_bin() | iolist(). % client can choose either rep. +-type chunk_bin() :: binary(). % server returns binary() only. +-type chunk_csum() :: <<>> | chunk_csum_bin() | {csum_tag(), binary()}. +-type chunk_csum_bin() :: binary(). % 1 byte tag, N-1 bytes checksum +-type chunk_cstrm() :: 'trimmed' | chunk_csum(). +-type chunk_summary() :: {file_offset(), chunk_size(), chunk_bin(), chunk_cstrm()}. -type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}. -type chunk_size() :: non_neg_integer(). -type error_general() :: 'bad_arg' | 'wedged' | 'bad_checksum'. @@ -62,9 +63,9 @@ chunk/0, chunk_bin/0, chunk_csum/0, - csum_tag/0, + chunk_csum_bin/0, + chunk_cstrm/0, chunk_summary/0, - chunk_s/0, chunk_pos/0, chunk_size/0, error_general/0, diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index bf36fee..37e6d5a 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -145,7 +145,7 @@ ]). %% For "internal" replication only. -export([ - write_chunk/6, write_chunk/7, + write_chunk/7, write_chunk/8, trim_chunk/6, delete_migration/3, delete_migration/4, trunc_hack/3, trunc_hack/4 @@ -216,7 +216,7 @@ append_chunk(Host, TcpPort, NSInfo0, EpochID, -spec read_chunk(port_wrap(), 'undefined' | machi_dt:ns_info(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:read_opts_x()) -> - {ok, machi_dt:chunk_s()} | + {ok, {[machi_dt:chunk_summary()], [machi_dt:chunk_pos()]}} | {error, machi_dt:error_general() | 'not_written' | 'partial_read'} | {error, term()}. read_chunk(Sock, NSInfo0, EpochID, File, Offset, Size, Opts0) @@ -230,7 +230,7 @@ read_chunk(Sock, NSInfo0, EpochID, File, Offset, Size, Opts0) -spec read_chunk(machi_dt:inet_host(), machi_dt:inet_port(), 'undefined' | machi_dt:ns_info(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:read_opts_x()) -> - {ok, machi_dt:chunk_s()} | + {ok, [machi_dt:chunk_summary()]} | {error, machi_dt:error_general() | 'not_written' | 'partial_read'} | {error, term()}. read_chunk(Host, TcpPort, NSInfo0, EpochID, File, Offset, Size, Opts0) @@ -527,25 +527,25 @@ disconnect(_) -> %% @doc Restricted API: Write a chunk of already-sequenced data to %% `File' at `Offset'. --spec write_chunk(port_wrap(), 'undefined' | machi_dt:ns_info(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk()) -> +-spec write_chunk(port_wrap(), 'undefined' | machi_dt:ns_info(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk(), machi_dt:chunk_csum()) -> ok | {error, machi_dt:error_general()} | {error, term()}. -write_chunk(Sock, NSInfo0, EpochID, File, Offset, Chunk) +write_chunk(Sock, NSInfo0, EpochID, File, Offset, Chunk, CSum) when Offset >= ?MINIMUM_OFFSET -> NSInfo = machi_util:ns_info_default(NSInfo0), - write_chunk2(Sock, NSInfo, EpochID, File, Offset, Chunk). + write_chunk2(Sock, NSInfo, EpochID, File, Offset, Chunk, CSum). %% @doc Restricted API: Write a chunk of already-sequenced data to %% `File' at `Offset'. -spec write_chunk(machi_dt:inet_host(), machi_dt:inet_port(), - 'undefined' | machi_dt:ns_info(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk()) -> + 'undefined' | machi_dt:ns_info(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk(), machi_dt:chunk_csum()) -> ok | {error, machi_dt:error_general()} | {error, term()}. -write_chunk(Host, TcpPort, NSInfo0, EpochID, File, Offset, Chunk) +write_chunk(Host, TcpPort, NSInfo0, EpochID, File, Offset, Chunk, CSum) when Offset >= ?MINIMUM_OFFSET -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try NSInfo = machi_util:ns_info_default(NSInfo0), - write_chunk2(Sock, NSInfo, EpochID, File, Offset, Chunk) + write_chunk2(Sock, NSInfo, EpochID, File, Offset, Chunk, CSum) after disconnect(Sock) end. @@ -641,19 +641,19 @@ append_chunk2(Sock, NSInfo, EpochID, Prefix, Chunk, CSum_tag, CSum, Opts}), do_pb_request_common(Sock, ReqID, Req, true, Timeout). -write_chunk2(Sock, NSInfo, EpochID, File0, Offset, Chunk0) -> +write_chunk2(Sock, NSInfo, EpochID, File0, Offset, Chunk, CSum0) -> ReqID = <<"id">>, #ns_info{version=NSVersion, name=NS} = NSInfo, File = machi_util:make_binary(File0), true = (Offset >= ?MINIMUM_OFFSET), - {Chunk, CSum_tag, CSum} = - case Chunk0 of - X when is_binary(X) -> - {Chunk0, ?CSUM_TAG_NONE, <<>>}; - {ChunkCSum, Chk} -> - {Tag, CS} = machi_util:unmake_tagged_csum(ChunkCSum), - {Chk, Tag, CS} - end, + {CSum_tag, CSum} = case CSum0 of + <<>> -> + {?CSUM_TAG_NONE, <<>>}; + {_Tag, _CS} -> + CSum0; + B when is_binary(B) -> + machi_util:unmake_tagged_csum(CSum0) + end, Req = machi_pb_translate:to_pb_request( ReqID, {low_write_chunk, NSVersion, NS, EpochID, File, Offset, Chunk, CSum_tag, CSum}), diff --git a/src/machi_flu1_net_server.erl b/src/machi_flu1_net_server.erl index 5bbabba..7a9f549 100644 --- a/src/machi_flu1_net_server.erl +++ b/src/machi_flu1_net_server.erl @@ -586,12 +586,11 @@ do_pb_hl_request2({high_append_chunk, NS, Prefix, Chunk, TaggedCSum, Opts}, Res = machi_cr_client:append_chunk(Clnt, NSInfo, Prefix, Chunk, TaggedCSum, Opts), {Res, S}; -do_pb_hl_request2({high_write_chunk, File, Offset, ChunkBin, TaggedCSum}, +do_pb_hl_request2({high_write_chunk, File, Offset, Chunk, CSum}, #state{high_clnt=Clnt}=S) -> NSInfo = undefined, io:format(user, "TODO fix broken write_chunk mod ~s line ~w\n", [?MODULE, ?LINE]), - Chunk = {TaggedCSum, ChunkBin}, - Res = machi_cr_client:write_chunk(Clnt, NSInfo, File, Offset, Chunk), + Res = machi_cr_client:write_chunk(Clnt, NSInfo, File, Offset, Chunk, CSum), {Res, S}; do_pb_hl_request2({high_read_chunk, File, Offset, Size, Opts}, #state{high_clnt=Clnt}=S) -> diff --git a/src/machi_pb_high_client.erl b/src/machi_pb_high_client.erl index 85600bd..23f01b8 100644 --- a/src/machi_pb_high_client.erl +++ b/src/machi_pb_high_client.erl @@ -102,7 +102,7 @@ auth(PidSpec, User, Pass, Timeout) -> -spec append_chunk(pid(), NS::machi_dt:namespace(), Prefix::machi_dt:file_prefix(), - Chunk::machi_dt:chunk_bin(), CSum::machi_dt:chunk_csum(), + Chunk::machi_dt:chunk(), CSum::machi_dt:chunk_csum(), Opts::machi_dt:append_opts()) -> {ok, Filename::string(), Offset::machi_dt:file_offset()} | {error, machi_client_error_reason()}. @@ -111,7 +111,7 @@ append_chunk(PidSpec, NS, Prefix, Chunk, CSum, Opts) -> -spec append_chunk(pid(), NS::machi_dt:namespace(), Prefix::machi_dt:file_prefix(), - Chunk::machi_dt:chunk_bin(), CSum::machi_dt:chunk_csum(), + Chunk::machi_dt:chunk(), CSum::machi_dt:chunk_csum(), Opts::machi_dt:append_opts(), Timeout::non_neg_integer()) -> {ok, Filename::string(), Offset::machi_dt:file_offset()} | @@ -120,13 +120,13 @@ append_chunk(PidSpec, NS, Prefix, Chunk, CSum, Opts, Timeout) -> send_sync(PidSpec, {append_chunk, NS, Prefix, Chunk, CSum, Opts}, Timeout). -spec write_chunk(pid(), File::string(), machi_dt:file_offset(), - Chunk::machi_dt:chunk_bin(), CSum::machi_dt:chunk_csum()) -> + Chunk::machi_dt:chunk(), CSum::machi_dt:chunk_csum()) -> ok | {error, machi_client_error_reason()}. write_chunk(PidSpec, File, Offset, Chunk, CSum) -> write_chunk(PidSpec, File, Offset, Chunk, CSum, ?DEFAULT_TIMEOUT). -spec write_chunk(pid(), File::string(), machi_dt:file_offset(), - Chunk::machi_dt:chunk_bin(), CSum::machi_dt:chunk_csum(), Timeout::non_neg_integer()) -> + Chunk::machi_dt:chunk(), CSum::machi_dt:chunk_csum(), Timeout::non_neg_integer()) -> ok | {error, machi_client_error_reason()}. write_chunk(PidSpec, File, Offset, Chunk, CSum, Timeout) -> send_sync(PidSpec, {write_chunk, File, Offset, Chunk, CSum}, Timeout). diff --git a/src/machi_pb_translate.erl b/src/machi_pb_translate.erl index 20aa897..707a339 100644 --- a/src/machi_pb_translate.erl +++ b/src/machi_pb_translate.erl @@ -196,9 +196,9 @@ from_pb_request(#mpb_request{req_id=ReqID, #mpb_writechunkreq{chunk=#mpb_chunk{file_name=File, offset=Offset, chunk=Chunk, - csum=CSum}} = IR, - TaggedCSum = make_tagged_csum(CSum, Chunk), - {ReqID, {high_write_chunk, File, Offset, Chunk, TaggedCSum}}; + csum=CSumRec}} = IR, + CSum = make_tagged_csum(CSumRec, Chunk), + {ReqID, {high_write_chunk, File, Offset, Chunk, CSum}}; from_pb_request(#mpb_request{req_id=ReqID, read_chunk=IR=#mpb_readchunkreq{}}) -> #mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File, @@ -732,7 +732,7 @@ to_pb_response(ReqID, {high_append_chunk, _NS, _Prefix, _Chunk, _TSum, _O}, Resp _Else -> make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else])) end; -to_pb_response(ReqID, {high_write_chunk, _File, _Offset, _Chunk, _TaggedCSum}, Resp) -> +to_pb_response(ReqID, {high_write_chunk, _File, _Offset, _Chunk, _CSum}, Resp) -> case Resp of {ok, {_,_,_}} -> %% machi_cr_client returns ok 2-tuple, convert to simple ok. diff --git a/src/machi_proxy_flu1_client.erl b/src/machi_proxy_flu1_client.erl index 5a85cd3..8f9dcf6 100644 --- a/src/machi_proxy_flu1_client.erl +++ b/src/machi_proxy_flu1_client.erl @@ -81,7 +81,7 @@ quit/1, %% Internal API - write_chunk/6, write_chunk/7, + write_chunk/7, write_chunk/8, trim_chunk/6, trim_chunk/7, %% Helpers @@ -280,14 +280,14 @@ quit(PidSpec) -> %% @doc Write a chunk (binary- or iolist-style) of data to a file %% with `Prefix' at `Offset'. -write_chunk(PidSpec, NSInfo, EpochID, File, Offset, Chunk) -> - write_chunk(PidSpec, NSInfo, EpochID, File, Offset, Chunk, infinity). +write_chunk(PidSpec, NSInfo, EpochID, File, Offset, Chunk, CSum) -> + write_chunk(PidSpec, NSInfo, EpochID, File, Offset, Chunk, CSum, infinity). %% @doc Write a chunk (binary- or iolist-style) of data to a file %% with `Prefix' at `Offset'. -write_chunk(PidSpec, NSInfo, EpochID, File, Offset, Chunk, Timeout) -> - case gen_server:call(PidSpec, {req, {write_chunk, NSInfo, EpochID, File, Offset, Chunk}}, +write_chunk(PidSpec, NSInfo, EpochID, File, Offset, Chunk, CSum, Timeout) -> + case gen_server:call(PidSpec, {req, {write_chunk, NSInfo, EpochID, File, Offset, Chunk, CSum}}, Timeout) of {error, written}=Err -> Size = byte_size(Chunk), @@ -384,9 +384,9 @@ make_req_fun({append_chunk, NSInfo, EpochID, make_req_fun({read_chunk, NSInfo, EpochID, File, Offset, Size, Opts}, #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> fun() -> Mod:read_chunk(Sock, NSInfo, EpochID, File, Offset, Size, Opts) end; -make_req_fun({write_chunk, NSInfo, EpochID, File, Offset, Chunk}, +make_req_fun({write_chunk, NSInfo, EpochID, File, Offset, Chunk, CSum}, #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> - fun() -> Mod:write_chunk(Sock, NSInfo, EpochID, File, Offset, Chunk) end; + fun() -> Mod:write_chunk(Sock, NSInfo, EpochID, File, Offset, Chunk, CSum) end; make_req_fun({trim_chunk, NSInfo, EpochID, File, Offset, Size}, #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> fun() -> Mod:trim_chunk(Sock, NSInfo, EpochID, File, Offset, Size) end; diff --git a/src/machi_util.erl b/src/machi_util.erl index 8173898..95a42a5 100644 --- a/src/machi_util.erl +++ b/src/machi_util.erl @@ -448,6 +448,8 @@ int2bool(I) when is_integer(I) -> true. read_opts_default(#read_opts{}=NSInfo) -> NSInfo; +read_opts_default(A) when A == 'undefined'; A == 'noopt'; A == 'none' -> + #read_opts{}; read_opts_default(A) when is_atom(A) -> #read_opts{}. diff --git a/test/machi_ap_repair_eqc.erl b/test/machi_ap_repair_eqc.erl index 14b8005..bbb8717 100644 --- a/test/machi_ap_repair_eqc.erl +++ b/test/machi_ap_repair_eqc.erl @@ -484,7 +484,7 @@ eqc_verbose() -> os:getenv("EQC_VERBOSE") =:= "true". eqc_timeout(Default) -> - PropTimeout = case os:getenv("EQC_TIMEOUT") of + PropTimeout = case os:getenv("EQC_TIME") of false -> Default; V -> list_to_integer(V) end, diff --git a/test/machi_cr_client_test.erl b/test/machi_cr_client_test.erl index 885fb35..7e4d31c 100644 --- a/test/machi_cr_client_test.erl +++ b/test/machi_cr_client_test.erl @@ -139,7 +139,7 @@ smoke_test2() -> Host, PortBase+X, NSInfo, EpochID, File1, FooOff1, Size1, undefined) || X <- [0,1,2] ], ok = machi_flu1_client:write_chunk(Host, PortBase+0, NSInfo, EpochID, - File1, FooOff1, Chunk1), + File1, FooOff1, Chunk1, NoCSum), {ok, {[{_, FooOff1, Chunk1, _}], []}} = machi_flu1_client:read_chunk(Host, PortBase+0, NSInfo, EpochID, File1, FooOff1, Size1, undefined), @@ -156,7 +156,7 @@ smoke_test2() -> Chunk2 = <<"Middle repair chunk">>, Size2 = size(Chunk2), ok = machi_flu1_client:write_chunk(Host, PortBase+1, NSInfo, EpochID, - File1, FooOff2, Chunk2), + File1, FooOff2, Chunk2, NoCSum), {ok, {[{_, FooOff2, Chunk2, _}], []}} = machi_cr_client:read_chunk(C1, NSInfo, File1, FooOff2, Size2, undefined), [{X,{ok, {[{_, FooOff2, Chunk2, _}], []}}} = @@ -196,7 +196,7 @@ smoke_test2() -> %% {error,not_written} = machi_cr_client:read_chunk(C1, NSInfo, File10, %% Offx, Size10), {ok, {Offx,Size10,File10}} = - machi_cr_client:write_chunk(C1, NSInfo, File10, Offx, Chunk10), + machi_cr_client:write_chunk(C1, NSInfo, File10, Offx, Chunk10, NoCSum), {ok, {[{_, Offx, Chunk10, _}], []}} = machi_cr_client:read_chunk(C1, NSInfo, File10, Offx, Size10, undefined) end || Seq <- lists:seq(1, Extra10)], @@ -286,7 +286,7 @@ witness_smoke_test2() -> File10 = File1, Offx = Off1 + (1 * Size10), {error, partition} = - machi_cr_client:write_chunk(C1, NSInfo, File10, Offx, Chunk10, 1*1000), + machi_cr_client:write_chunk(C1, NSInfo, File10, Offx, Chunk10, NoCSum, 1*1000), ok after diff --git a/test/machi_file_proxy_eqc.erl b/test/machi_file_proxy_eqc.erl index dd36787..c7a50e2 100644 --- a/test/machi_file_proxy_eqc.erl +++ b/test/machi_file_proxy_eqc.erl @@ -35,10 +35,14 @@ %% EUNIT TEST DEFINITION eqc_test_() -> - {timeout, 60, + PropTimeout = case os:getenv("EQC_TIME") of + false -> 30; + V -> list_to_integer(V) + end, + {timeout, PropTimeout*2 + 30, {spawn, [ - ?_assertEqual(true, eqc:quickcheck(eqc:testing_time(30, ?QC_OUT(prop_ok())))) + ?_assertEqual(true, eqc:quickcheck(eqc:testing_time(PropTimeout, ?QC_OUT(prop_ok())))) ] }}. diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index fc34a51..74490d2 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -161,9 +161,9 @@ flu_smoke_test() -> Off2 = ?MINIMUM_OFFSET + 77, File2 = "smoke-whole-file^^0^1^1", ok = ?FLU_C:write_chunk(Host, TcpPort, NSInfo, ?DUMMY_PV1_EPOCH, - File2, Off2, Chunk2), + File2, Off2, Chunk2, NoCSum), {error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, NSInfo, ?DUMMY_PV1_EPOCH, - BadFile, Off2, Chunk2), + BadFile, Off2, Chunk2, NoCSum), {ok, {[{_, Off2, Chunk2, _}], _}} = ?FLU_C:read_chunk(Host, TcpPort, NSInfo, ?DUMMY_PV1_EPOCH, File2, Off2, Len2, noopt), {error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort, @@ -262,7 +262,7 @@ bad_checksum_test() -> try Prefix = <<"some prefix">>, Chunk1 = <<"yo yo yo">>, - BadCSum = {?CSUM_TAG_CLIENT_SHA, crypto:sha("foo")}, + BadCSum = {?CSUM_TAG_CLIENT_SHA, crypto:hash(sha, ".................")}, {error, bad_checksum} = ?FLU_C:append_chunk(Host, TcpPort, NSInfo, ?DUMMY_PV1_EPOCH, Prefix, diff --git a/test/machi_proxy_flu1_client_test.erl b/test/machi_proxy_flu1_client_test.erl index 617afff..7f8dcce 100644 --- a/test/machi_proxy_flu1_client_test.erl +++ b/test/machi_proxy_flu1_client_test.erl @@ -61,24 +61,26 @@ api_smoke_test() -> {ok, {MyOff,MySize,MyFile}} = ?MUT:append_chunk(Prox1, NSInfo, FakeEpoch, Prefix, MyChunk, NoCSum), - {ok, {[{_, MyOff, MyChunk, _}], []}} = + {ok, {[{_, MyOff, MyChunk, _MyChunkCSUM}], []}} = ?MUT:read_chunk(Prox1, NSInfo, FakeEpoch, MyFile, MyOff, MySize, undefined), - MyChunk2 = <<"my chunk data, yeah, again">>, + MyChunk2_parts = [<<"my chunk ">>, "data", <<", yeah, again">>], + MyChunk2 = iolist_to_binary(MyChunk2_parts), Opts1 = #append_opts{chunk_extra=4242}, {ok, {MyOff2,MySize2,MyFile2}} = ?MUT:append_chunk(Prox1, NSInfo, FakeEpoch, Prefix, - MyChunk2, NoCSum, Opts1, infinity), - {ok, {[{_, MyOff2, MyChunk2, _}], []}} = - ?MUT:read_chunk(Prox1, NSInfo, FakeEpoch, MyFile2, MyOff2, MySize2, undefined), - BadCSum = {?CSUM_TAG_CLIENT_SHA, crypto:sha("foo")}, + MyChunk2_parts, NoCSum, Opts1, infinity), + [{ok, {[{_, MyOff2, MyChunk2, _}], []}} = + ?MUT:read_chunk(Prox1, NSInfo, FakeEpoch, MyFile2, MyOff2, MySize2, DefaultOptions) || + DefaultOptions <- [undefined, noopt, none, any_atom_at_all] ], + + BadCSum = {?CSUM_TAG_CLIENT_SHA, crypto:hash(sha, "...................")}, {error, bad_checksum} = ?MUT:append_chunk(Prox1, NSInfo, FakeEpoch, Prefix, MyChunk, BadCSum), - Opts2 = #append_opts{chunk_extra=99832}, -io:format(user, "\nTODO: fix write_chunk() call below @ ~s LINE ~w\n", [?MODULE,?LINE]), - %% {error, bad_checksum} = ?MUT:write_chunk(Prox1, NSInfo, FakeEpoch, - %% <<"foo-file^^0^1^1">>, - %% MyChunk, BadCSum, - %% Opts2, infinity), + {error, bad_checksum} = ?MUT:write_chunk(Prox1, NSInfo, FakeEpoch, + MyFile2, + MyOff2 + size(MyChunk2), + MyChunk, BadCSum, + infinity), %% Put kick_projection_reaction() in the middle of the test so %% that any problems with its async nature will (hopefully) @@ -283,20 +285,20 @@ flu_restart_test2() -> fun(run) -> ok = ?MUT:write_chunk(Prox1, NSInfo, FakeEpoch, File1, Off1, - Data, infinity), + Data, NoCSum, infinity), ok; (line) -> io:format("line ~p, ", [?LINE]); (stop) -> ?MUT:write_chunk(Prox1, NSInfo, FakeEpoch, File1, Off1, - Data, infinity) + Data, NoCSum, infinity) end, fun(run) -> {error, written} = ?MUT:write_chunk(Prox1, NSInfo, FakeEpoch, File1, Off1, - Dataxx, infinity), + Dataxx, NoCSum, infinity), ok; (line) -> io:format("line ~p, ", [?LINE]); (stop) -> ?MUT:write_chunk(Prox1, NSInfo, FakeEpoch, File1, Off1, - Dataxx, infinity) + Dataxx, NoCSum, infinity) end ],