WIP: most eunit tests fixed, chain repair intermittently broken

This commit is contained in:
Scott Lystig Fritchie 2016-02-08 22:04:09 +09:00
parent 6e17988ac7
commit fbb0203f67
14 changed files with 165 additions and 144 deletions

View file

@ -105,6 +105,8 @@ repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) ->
RepairMode = proplists:get_value(repair_mode, Opts, repair),
Verb = proplists:get_value(verbose, Opts, false),
RepairId = proplists:get_value(repair_id, Opts, id1),
erlang:display(wtf),
%% io:format(user, "TODO: ~p\n", [{error, {What, Why, Stack}}]),
Res = try
_ = [begin
{ok, Proxy} = machi_proxy_flu1_client:start_link(P),
@ -127,6 +129,7 @@ repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) ->
{ok, EpochID} = machi_proxy_flu1_client:get_epoch_id(
SrcProxy, ?SHORT_TIMEOUT),
%% ?VERB("Make repair directives: "),
erlang:display(yo1),
Ds =
[{File, make_repair_directives(
ConsistencyMode, RepairMode, File, Size, EpochID,
@ -146,16 +149,21 @@ repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) ->
end || FLU <- OurFLUs],
%% ?VERB("Execute repair directives: "),
erlang:display(yo1),
ok = execute_repair_directives(ConsistencyMode, Ds, Src, EpochID,
Verb, OurFLUs, ProxiesDict, ETS),
erlang:display(yo2),
%% ?VERB(" done\n"),
lager:info("Repair ~w repair directives finished\n", [RepairId]),
ok
catch
What:Why ->
io:format(user, "yo3 ~p ~p\n", [What,Why]),
Stack = erlang:get_stacktrace(),
io:format(user, "yo3 ~p\n", [Stack]),
{error, {What, Why, Stack}}
after
erlang:display(yo4),
[(catch machi_proxy_flu1_client:quit(Pid)) ||
Pid <- orddict:to_list(get(proxies_dict))]
end,
@ -236,7 +244,7 @@ make_repair_directives(ConsistencyMode, RepairMode, File, Size, _EpochID,
make_repair_directives2(C2, ConsistencyMode, RepairMode,
File, Verb, Src, FLUs, ProxiesDict, ETS) ->
?VERB("."),
?VERB(".1"),
make_repair_directives3(C2, ConsistencyMode, RepairMode,
File, Verb, Src, FLUs, ProxiesDict, ETS, []).
@ -327,17 +335,17 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) ->
F = fun({copy, {Offset, Size, TaggedCSum, MySrc}, MyDsts}, Acc2) ->
SrcP = orddict:fetch(MySrc, ProxiesDict),
case ets:lookup_element(ETS, in_chunks, 2) rem 100 of
0 -> ?VERB(".", []);
0 -> ?VERB(".2", []);
_ -> ok
end,
_T1 = os:timestamp(),
%% TODO: support case multiple written or trimmed chunks returned
NSInfo = undefined,
io:format(user, "TODO fix broken read_chunk mod ~s line ~w\n", [?MODULE, ?LINE]),
{ok, {[{_, Offset, Chunk, _}], _}} =
{ok, {[{_, Offset, Chunk, _ReadCSum}|OtherChunks], []=_TrimmedList}} =
machi_proxy_flu1_client:read_chunk(
SrcP, NSInfo, EpochID, File, Offset, Size, undefined,
?SHORT_TIMEOUT),
[] = OtherChunks,
_T2 = os:timestamp(),
<<_Tag:1/binary, CSum/binary>> = TaggedCSum,
case machi_util:checksum_chunk(Chunk) of
@ -346,7 +354,7 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) ->
DstP = orddict:fetch(DstFLU, ProxiesDict),
_T3 = os:timestamp(),
ok = machi_proxy_flu1_client:write_chunk(
DstP, NSInfo, EpochID, File, Offset, Chunk,
DstP, NSInfo, EpochID, File, Offset, Chunk, TaggedCSum,
?SHORT_TIMEOUT),
_T4 = os:timestamp()
end || DstFLU <- MyDsts],
@ -371,7 +379,9 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) ->
Acc2
end
end,
erlang:display({yo,?LINE}),
ok = lists:foldl(F, ok, Cmds),
erlang:display({yo,?LINE}),
%% Copy this file's stats to the total counts.
_ = [ets:update_counter(ETS, T_K, ets:lookup_element(ETS, L_K, 2)) ||
{L_K, T_K} <- EtsKeys],

View file

@ -63,7 +63,7 @@
%% File API
append_chunk/5,
append_chunk/6, append_chunk/7,
write_chunk/5, write_chunk/6,
write_chunk/6, write_chunk/7,
read_chunk/6, read_chunk/7,
trim_chunk/5, trim_chunk/6,
checksum_list/2, checksum_list/3,
@ -129,14 +129,14 @@ append_chunk(PidSpec, NSInfo, Prefix, Chunk, CSum, #append_opts{}=Opts, Timeout0
%% allocated/sequenced by an earlier append_chunk() call) to
%% `File' at `Offset'.
write_chunk(PidSpec, NSInfo, File, Offset, Chunk) ->
write_chunk(PidSpec, NSInfo, File, Offset, Chunk, ?DEFAULT_TIMEOUT).
write_chunk(PidSpec, NSInfo, File, Offset, Chunk, CSum) ->
write_chunk(PidSpec, NSInfo, File, Offset, Chunk, CSum, ?DEFAULT_TIMEOUT).
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
write_chunk(PidSpec, NSInfo, File, Offset, Chunk, Timeout0) ->
write_chunk(PidSpec, NSInfo, File, Offset, Chunk, CSum, Timeout0) ->
{TO, Timeout} = timeout(Timeout0),
gen_server:call(PidSpec, {req, {write_chunk, NSInfo, File, Offset, Chunk, TO}},
gen_server:call(PidSpec, {req, {write_chunk, NSInfo, File, Offset, Chunk, CSum, TO}},
Timeout).
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
@ -229,8 +229,8 @@ handle_call2({append_chunk, NSInfo,
Prefix, Chunk, CSum, Opts, TO}, _From, S) ->
do_append_head(NSInfo, Prefix,
Chunk, CSum, Opts, 0, os:timestamp(), TO, S);
handle_call2({write_chunk, NSInfo, File, Offset, Chunk, TO}, _From, S) ->
do_write_head(NSInfo, File, Offset, Chunk, 0, os:timestamp(), TO, S);
handle_call2({write_chunk, NSInfo, File, Offset, Chunk, CSum, TO}, _From, S) ->
do_write_head(NSInfo, File, Offset, Chunk, CSum, 0, os:timestamp(), TO, S);
handle_call2({read_chunk, NSInfo, File, Offset, Size, Opts, TO}, _From, S) ->
do_read_chunk(NSInfo, File, Offset, Size, Opts, 0, os:timestamp(), TO, S);
handle_call2({trim_chunk, NSInfo, File, Offset, Size, TO}, _From, S) ->
@ -246,7 +246,6 @@ do_append_head(NSInfo, Prefix,
Chunk, CSum, Opts, Depth + 1, STime, TO, S);
do_append_head(NSInfo, Prefix,
Chunk, CSum, Opts, Depth, STime, TO, #state{proj=P}=S) ->
%% io:format(user, "head sleep1,", []),
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > TO ->
@ -300,9 +299,9 @@ do_append_head3(NSInfo, Prefix,
case ?FLU_PC:append_chunk(Proxy, NSInfo, EpochID,
Prefix, Chunk, CSum, Opts, ?TIMEOUT) of
{ok, {Offset, _Size, File}=_X} ->
do_append_midtail(RestFLUs, NSInfo, Prefix,
do_wr_app_midtail(RestFLUs, NSInfo, Prefix,
File, Offset, Chunk, CSum, Opts,
[HeadFLU], 0, STime, TO, S);
[HeadFLU], 0, STime, TO, append, S);
{error, bad_checksum}=BadCS ->
{reply, BadCS, S};
{error, Retry}
@ -323,17 +322,16 @@ do_append_head3(NSInfo, Prefix,
Prefix,iolist_size(Chunk)})
end.
do_append_midtail(RestFLUs, NSInfo, Prefix,
do_wr_app_midtail(RestFLUs, NSInfo, Prefix,
File, Offset, Chunk, CSum, Opts,
Ws, Depth, STime, TO, S)
Ws, Depth, STime, TO, MyOp, S)
when RestFLUs == [] orelse Depth == 0 ->
do_append_midtail2(RestFLUs, NSInfo, Prefix,
do_wr_app_midtail2(RestFLUs, NSInfo, Prefix,
File, Offset, Chunk, CSum, Opts,
Ws, Depth + 1, STime, TO, S);
do_append_midtail(_RestFLUs, NSInfo, Prefix, File,
Ws, Depth + 1, STime, TO, MyOp, S);
do_wr_app_midtail(_RestFLUs, NSInfo, Prefix, File,
Offset, Chunk, CSum, Opts,
Ws, Depth, STime, TO, #state{proj=P}=S) ->
%% io:format(user, "midtail sleep2,", []),
Ws, Depth, STime, TO, MyOp, #state{proj=P}=S) ->
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > TO ->
@ -347,60 +345,66 @@ do_append_midtail(_RestFLUs, NSInfo, Prefix, File,
RestFLUs2 = mutation_flus(P2),
case RestFLUs2 -- Ws of
RestFLUs2 ->
%% None of the writes that we have done so far
%% are to FLUs that are in the RestFLUs2 list.
%% We are pessimistic here and assume that
%% those FLUs are permanently dead. Start
%% over with a new sequencer assignment, at
%% the 2nd have of the impl (we have already
%% slept & refreshed the projection).
if Prefix == undefined -> % atom! not binary()!!
{error, partition};
true ->
MyOp == append ->
%% None of the writes that we have done so
%% far are to FLUs that are in the
%% RestFLUs2 list. We are pessimistic
%% here and assume that those FLUs are
%% permanently dead. Start over with a
%% new sequencer assignment, at the 2nd
%% have of the impl (we have already slept
%% & refreshed the projection).
do_append_head2(NSInfo,
Prefix, Chunk, CSum, Opts,
Depth, STime, TO, S2)
Depth, STime, TO, S2);
MyOp == write ->
do_wr_app_midtail2(RestFLUs2,
NSInfo,
Prefix, File, Offset,
Chunk, CSum, Opts,
Ws, Depth + 1, STime, TO,
MyOp, S2)
end;
RestFLUs3 ->
do_append_midtail2(RestFLUs3,
do_wr_app_midtail2(RestFLUs3,
NSInfo,
Prefix, File, Offset,
Chunk, CSum, Opts,
Ws, Depth + 1, STime, TO, S2)
Ws, Depth + 1, STime, TO,
MyOp, S2)
end
end
end.
do_append_midtail2([], _NSInfo,
do_wr_app_midtail2([], _NSInfo,
_Prefix, File, Offset, Chunk,
_CSum, _Opts, _Ws, _Depth, _STime, _TO, S) ->
%% io:format(user, "ok!\n", []),
_CSum, _Opts, _Ws, _Depth, _STime, _TO, _MyOp, S) ->
{reply, {ok, {Offset, chunk_wrapper_size(Chunk), File}}, S};
do_append_midtail2([FLU|RestFLUs]=FLUs, NSInfo,
do_wr_app_midtail2([FLU|RestFLUs]=FLUs, NSInfo,
Prefix, File, Offset, Chunk,
CSum, Opts, Ws, Depth, STime, TO,
CSum, Opts, Ws, Depth, STime, TO, MyOp,
#state{epoch_id=EpochID, proxies_dict=PD}=S) ->
Proxy = orddict:fetch(FLU, PD),
case ?FLU_PC:write_chunk(Proxy, NSInfo, EpochID, File, Offset, Chunk, ?TIMEOUT) of
case ?FLU_PC:write_chunk(Proxy, NSInfo, EpochID, File, Offset, Chunk, CSum, ?TIMEOUT) of
ok ->
%% io:format(user, "write ~w,", [FLU]),
do_append_midtail2(RestFLUs, NSInfo, Prefix,
do_wr_app_midtail2(RestFLUs, NSInfo, Prefix,
File, Offset, Chunk,
CSum, Opts, [FLU|Ws], Depth, STime, TO, S);
CSum, Opts, [FLU|Ws], Depth, STime, TO, MyOp, S);
{error, bad_checksum}=BadCS ->
%% TODO: alternate strategy?
{reply, BadCS, S};
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
do_append_midtail(FLUs, NSInfo, Prefix,
do_wr_app_midtail(FLUs, NSInfo, Prefix,
File, Offset, Chunk,
CSum, Opts, Ws, Depth, STime, TO, S);
CSum, Opts, Ws, Depth, STime, TO, MyOp, S);
{error, written} ->
%% We know what the chunk ought to be, so jump to the
%% middle of read-repair.
Resume = {append, Offset, iolist_size(Chunk), File},
do_repair_chunk(FLUs, Resume, Chunk, [], NSInfo, File, Offset,
do_repair_chunk(FLUs, Resume, Chunk, CSum, [], NSInfo, File, Offset,
iolist_size(Chunk), Depth, STime, S);
{error, trimmed} = Err ->
%% TODO: nothing can be done
@ -426,10 +430,9 @@ witnesses_use_our_epoch([FLU|RestFLUs],
false
end.
do_write_head(NSInfo, File, Offset, Chunk, 0=Depth, STime, TO, S) ->
do_write_head2(NSInfo, File, Offset, Chunk, Depth + 1, STime, TO, S);
do_write_head(NSInfo, File, Offset, Chunk, Depth, STime, TO, #state{proj=P}=S) ->
%% io:format(user, "head sleep1,", []),
do_write_head(NSInfo, File, Offset, Chunk, CSum, 0=Depth, STime, TO, S) ->
do_write_head2(NSInfo, File, Offset, Chunk, CSum, Depth + 1, STime, TO, S);
do_write_head(NSInfo, File, Offset, Chunk, CSum, Depth, STime, TO, #state{proj=P}=S) ->
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > TO ->
@ -443,31 +446,32 @@ do_write_head(NSInfo, File, Offset, Chunk, Depth, STime, TO, #state{proj=P}=S) -
case S2#state.proj of
P2 when P2 == undefined orelse
P2#projection_v1.upi == [] ->
do_write_head(NSInfo, File, Offset, Chunk, Depth + 1,
do_write_head(NSInfo, File, Offset, Chunk, CSum, Depth + 1,
STime, TO, S2);
_ ->
do_write_head2(NSInfo, File, Offset, Chunk, Depth + 1,
do_write_head2(NSInfo, File, Offset, Chunk, CSum, Depth + 1,
STime, TO, S2)
end
end.
do_write_head2(NSInfo, File, Offset, Chunk, Depth, STime, TO,
do_write_head2(NSInfo, File, Offset, Chunk, CSum, Depth, STime, TO,
#state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) ->
[HeadFLU|RestFLUs] = mutation_flus(P),
Proxy = orddict:fetch(HeadFLU, PD),
case ?FLU_PC:write_chunk(Proxy, NSInfo, EpochID, File, Offset, Chunk, ?TIMEOUT) of
case ?FLU_PC:write_chunk(Proxy, NSInfo, EpochID, File, Offset, Chunk, CSum, ?TIMEOUT) of
ok ->
%% From this point onward, we use the same code & logic path as
%% append does.
Prefix=todo_prefix,CSum=todo_csum,Opts=todo_opts,
do_append_midtail(RestFLUs, NSInfo, Prefix,
Prefix=unused_write_path,
Opts=unused_write_path,
do_wr_app_midtail(RestFLUs, NSInfo, Prefix,
File, Offset, Chunk,
CSum, Opts, [HeadFLU], 0, STime, TO, S);
CSum, Opts, [HeadFLU], 0, STime, TO, write, S);
{error, bad_checksum}=BadCS ->
{reply, BadCS, S};
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
do_write_head(NSInfo, File, Offset, Chunk, Depth, STime, TO, S);
do_write_head(NSInfo, File, Offset, Chunk, CSum, Depth, STime, TO, S);
{error, written}=Err ->
{reply, Err, S};
{error, trimmed}=Err ->
@ -588,7 +592,6 @@ do_trim_midtail(RestFLUs, Prefix, NSInfo, File, Offset, Size,
Ws, Depth + 1, STime, TO, S);
do_trim_midtail(_RestFLUs, Prefix, NSInfo, File, Offset, Size,
Ws, Depth, STime, TO, #state{proj=P}=S) ->
%% io:format(user, "midtail sleep2,", []),
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > TO ->
@ -625,7 +628,6 @@ do_trim_midtail(_RestFLUs, Prefix, NSInfo, File, Offset, Size,
do_trim_midtail2([], _Prefix, _NSInfo, _File, _Offset, _Size,
_Ws, _Depth, _STime, _TO, S) ->
%% io:format(user, "ok!\n", []),
{reply, ok, S};
do_trim_midtail2([FLU|RestFLUs]=FLUs, Prefix, NSInfo, File, Offset, Size,
Ws, Depth, STime, TO,
@ -633,7 +635,6 @@ do_trim_midtail2([FLU|RestFLUs]=FLUs, Prefix, NSInfo, File, Offset, Size,
Proxy = orddict:fetch(FLU, PD),
case ?FLU_PC:trim_chunk(Proxy, NSInfo, EpochID, File, Offset, Size, ?TIMEOUT) of
ok ->
%% io:format(user, "write ~w,", [FLU]),
do_trim_midtail2(RestFLUs, Prefix, NSInfo, File, Offset, Size,
[FLU|Ws], Depth, STime, TO, S);
{error, trimmed} ->
@ -748,10 +749,11 @@ read_repair2(ap_mode=ConsistencyMode,
do_repair_chunks([], _, _, _, _, _, _, _, S, Reply) ->
{Reply, S};
do_repair_chunks([{_, Offset, Chunk, _Csum}|T],
do_repair_chunks([{_, Offset, Chunk, CSum}|T],
ToRepair, ReturnMode, [GotItFrom], NSInfo, File, Depth, STime, S, Reply) ->
true = _TODO_fixme = not is_atom(CSum),
Size = iolist_size(Chunk),
case do_repair_chunk(ToRepair, ReturnMode, Chunk, [GotItFrom], NSInfo, File, Offset,
case do_repair_chunk(ToRepair, ReturnMode, Chunk, CSum, [GotItFrom], NSInfo, File, Offset,
Size, Depth, STime, S) of
{ok, Chunk, S1} ->
do_repair_chunks(T, ToRepair, ReturnMode, [GotItFrom], NSInfo, File, Depth, STime, S1, Reply);
@ -759,9 +761,8 @@ do_repair_chunks([{_, Offset, Chunk, _Csum}|T],
Error
end.
do_repair_chunk(ToRepair, ReturnMode, Chunk, Repaired, NSInfo, File, Offset,
do_repair_chunk(ToRepair, ReturnMode, Chunk, CSum, Repaired, NSInfo, File, Offset,
Size, Depth, STime, #state{proj=P}=S) ->
%% io:format(user, "read_repair3 sleep1,", []),
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > ?MAX_RUNTIME ->
@ -771,16 +772,16 @@ do_repair_chunk(ToRepair, ReturnMode, Chunk, Repaired, NSInfo, File, Offset,
case S2#state.proj of
P2 when P2 == undefined orelse
P2#projection_v1.upi == [] ->
do_repair_chunk(ToRepair, ReturnMode, Chunk, Repaired, NSInfo, File,
do_repair_chunk(ToRepair, ReturnMode, Chunk, CSum, Repaired, NSInfo, File,
Offset, Size, Depth + 1, STime, S2);
P2 ->
ToRepair2 = mutation_flus(P2) -- Repaired,
do_repair_chunk2(ToRepair2, ReturnMode, Chunk, Repaired, NSInfo, File,
do_repair_chunk2(ToRepair2, ReturnMode, Chunk, CSum, Repaired, NSInfo, File,
Offset, Size, Depth + 1, STime, S2)
end
end.
do_repair_chunk2([], ReturnMode, Chunk, _Repaired, _NSInfo, File, Offset,
do_repair_chunk2([], ReturnMode, Chunk, _CSum, _Repaired, _NSInfo, File, Offset,
_IgnoreSize, _Depth, _STime, S) ->
%% TODO: add stats for # of repairs, length(_Repaired)-1, etc etc?
case ReturnMode of
@ -789,24 +790,24 @@ do_repair_chunk2([], ReturnMode, Chunk, _Repaired, _NSInfo, File, Offset,
{append, Offset, Size, File} ->
{ok, {Offset, Size, File}, S}
end;
do_repair_chunk2([First|Rest]=ToRepair, ReturnMode, Chunk, Repaired, NSInfo, File, Offset,
do_repair_chunk2([First|Rest]=ToRepair, ReturnMode, Chunk, CSum, Repaired, NSInfo, File, Offset,
Size, Depth, STime, #state{epoch_id=EpochID, proxies_dict=PD}=S) ->
Proxy = orddict:fetch(First, PD),
case ?FLU_PC:write_chunk(Proxy, NSInfo, EpochID, File, Offset, Chunk, ?TIMEOUT) of
case ?FLU_PC:write_chunk(Proxy, NSInfo, EpochID, File, Offset, Chunk, CSum, ?TIMEOUT) of
ok ->
do_repair_chunk2(Rest, ReturnMode, Chunk, [First|Repaired], NSInfo, File,
do_repair_chunk2(Rest, ReturnMode, Chunk, CSum, [First|Repaired], NSInfo, File,
Offset, Size, Depth, STime, S);
{error, bad_checksum}=BadCS ->
%% TODO: alternate strategy?
{BadCS, S};
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
do_repair_chunk(ToRepair, ReturnMode, Chunk, Repaired, NSInfo, File,
do_repair_chunk(ToRepair, ReturnMode, Chunk, CSum, Repaired, NSInfo, File,
Offset, Size, Depth, STime, S);
{error, written} ->
%% TODO: To be very paranoid, read the chunk here to verify
%% that it is exactly our Chunk.
do_repair_chunk2(Rest, ReturnMode, Chunk, Repaired, NSInfo, File,
do_repair_chunk2(Rest, ReturnMode, Chunk, CSum, Repaired, NSInfo, File,
Offset, Size, Depth, STime, S);
{error, trimmed} = _Error ->
%% TODO
@ -926,11 +927,13 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict,
update_proj2(Count + 1, S);
P when P >= BadProj ->
#projection_v1{epoch_number=Epoch, epoch_csum=CSum,
members_dict=NewMembersDict} = P,
members_dict=NewMembersDict, dbg2=Dbg2} = P,
EpochID = {Epoch, CSum},
?FLU_PC:stop_proxies(ProxiesDict),
NewProxiesDict = ?FLU_PC:start_proxies(NewMembersDict),
S#state{bad_proj=undefined, proj=P, epoch_id=EpochID,
%% Make crash reports shorter by getting rid of 'react' history.
P2 = P#projection_v1{dbg2=lists:keydelete(react, 1, Dbg2)},
S#state{bad_proj=undefined, proj=P2, epoch_id=EpochID,
members_dict=NewMembersDict, proxies_dict=NewProxiesDict};
_P ->
sleep_a_while(Count),

View file

@ -24,11 +24,12 @@
-include("machi_projection.hrl").
-type append_opts() :: #append_opts{}.
-type chunk() :: chunk_bin() | {chunk_csum(), chunk_bin()}.
-type chunk_bin() :: binary() | iolist(). % client can use either
-type chunk_csum() :: binary(). % 1 byte tag, N-1 bytes checksum
-type chunk_summary() :: {file_offset(), chunk_size(), binary()}.
-type chunk_s() :: 'trimmed' | binary().
-type chunk() :: chunk_bin() | iolist(). % client can choose either rep.
-type chunk_bin() :: binary(). % server returns binary() only.
-type chunk_csum() :: <<>> | chunk_csum_bin() | {csum_tag(), binary()}.
-type chunk_csum_bin() :: binary(). % 1 byte tag, N-1 bytes checksum
-type chunk_cstrm() :: 'trimmed' | chunk_csum().
-type chunk_summary() :: {file_offset(), chunk_size(), chunk_bin(), chunk_cstrm()}.
-type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}.
-type chunk_size() :: non_neg_integer().
-type error_general() :: 'bad_arg' | 'wedged' | 'bad_checksum'.
@ -62,9 +63,9 @@
chunk/0,
chunk_bin/0,
chunk_csum/0,
csum_tag/0,
chunk_csum_bin/0,
chunk_cstrm/0,
chunk_summary/0,
chunk_s/0,
chunk_pos/0,
chunk_size/0,
error_general/0,

View file

@ -145,7 +145,7 @@
]).
%% For "internal" replication only.
-export([
write_chunk/6, write_chunk/7,
write_chunk/7, write_chunk/8,
trim_chunk/6,
delete_migration/3, delete_migration/4,
trunc_hack/3, trunc_hack/4
@ -216,7 +216,7 @@ append_chunk(Host, TcpPort, NSInfo0, EpochID,
-spec read_chunk(port_wrap(), 'undefined' | machi_dt:ns_info(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size(),
machi_dt:read_opts_x()) ->
{ok, machi_dt:chunk_s()} |
{ok, {[machi_dt:chunk_summary()], [machi_dt:chunk_pos()]}} |
{error, machi_dt:error_general() | 'not_written' | 'partial_read'} |
{error, term()}.
read_chunk(Sock, NSInfo0, EpochID, File, Offset, Size, Opts0)
@ -230,7 +230,7 @@ read_chunk(Sock, NSInfo0, EpochID, File, Offset, Size, Opts0)
-spec read_chunk(machi_dt:inet_host(), machi_dt:inet_port(), 'undefined' | machi_dt:ns_info(), machi_dt:epoch_id(),
machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size(),
machi_dt:read_opts_x()) ->
{ok, machi_dt:chunk_s()} |
{ok, [machi_dt:chunk_summary()]} |
{error, machi_dt:error_general() | 'not_written' | 'partial_read'} |
{error, term()}.
read_chunk(Host, TcpPort, NSInfo0, EpochID, File, Offset, Size, Opts0)
@ -527,25 +527,25 @@ disconnect(_) ->
%% @doc Restricted API: Write a chunk of already-sequenced data to
%% `File' at `Offset'.
-spec write_chunk(port_wrap(), 'undefined' | machi_dt:ns_info(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk()) ->
-spec write_chunk(port_wrap(), 'undefined' | machi_dt:ns_info(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk(), machi_dt:chunk_csum()) ->
ok | {error, machi_dt:error_general()} | {error, term()}.
write_chunk(Sock, NSInfo0, EpochID, File, Offset, Chunk)
write_chunk(Sock, NSInfo0, EpochID, File, Offset, Chunk, CSum)
when Offset >= ?MINIMUM_OFFSET ->
NSInfo = machi_util:ns_info_default(NSInfo0),
write_chunk2(Sock, NSInfo, EpochID, File, Offset, Chunk).
write_chunk2(Sock, NSInfo, EpochID, File, Offset, Chunk, CSum).
%% @doc Restricted API: Write a chunk of already-sequenced data to
%% `File' at `Offset'.
-spec write_chunk(machi_dt:inet_host(), machi_dt:inet_port(),
'undefined' | machi_dt:ns_info(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk()) ->
'undefined' | machi_dt:ns_info(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk(), machi_dt:chunk_csum()) ->
ok | {error, machi_dt:error_general()} | {error, term()}.
write_chunk(Host, TcpPort, NSInfo0, EpochID, File, Offset, Chunk)
write_chunk(Host, TcpPort, NSInfo0, EpochID, File, Offset, Chunk, CSum)
when Offset >= ?MINIMUM_OFFSET ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
NSInfo = machi_util:ns_info_default(NSInfo0),
write_chunk2(Sock, NSInfo, EpochID, File, Offset, Chunk)
write_chunk2(Sock, NSInfo, EpochID, File, Offset, Chunk, CSum)
after
disconnect(Sock)
end.
@ -641,19 +641,19 @@ append_chunk2(Sock, NSInfo, EpochID,
Prefix, Chunk, CSum_tag, CSum, Opts}),
do_pb_request_common(Sock, ReqID, Req, true, Timeout).
write_chunk2(Sock, NSInfo, EpochID, File0, Offset, Chunk0) ->
write_chunk2(Sock, NSInfo, EpochID, File0, Offset, Chunk, CSum0) ->
ReqID = <<"id">>,
#ns_info{version=NSVersion, name=NS} = NSInfo,
File = machi_util:make_binary(File0),
true = (Offset >= ?MINIMUM_OFFSET),
{Chunk, CSum_tag, CSum} =
case Chunk0 of
X when is_binary(X) ->
{Chunk0, ?CSUM_TAG_NONE, <<>>};
{ChunkCSum, Chk} ->
{Tag, CS} = machi_util:unmake_tagged_csum(ChunkCSum),
{Chk, Tag, CS}
end,
{CSum_tag, CSum} = case CSum0 of
<<>> ->
{?CSUM_TAG_NONE, <<>>};
{_Tag, _CS} ->
CSum0;
B when is_binary(B) ->
machi_util:unmake_tagged_csum(CSum0)
end,
Req = machi_pb_translate:to_pb_request(
ReqID,
{low_write_chunk, NSVersion, NS, EpochID, File, Offset, Chunk, CSum_tag, CSum}),

View file

@ -586,12 +586,11 @@ do_pb_hl_request2({high_append_chunk, NS, Prefix, Chunk, TaggedCSum, Opts},
Res = machi_cr_client:append_chunk(Clnt, NSInfo,
Prefix, Chunk, TaggedCSum, Opts),
{Res, S};
do_pb_hl_request2({high_write_chunk, File, Offset, ChunkBin, TaggedCSum},
do_pb_hl_request2({high_write_chunk, File, Offset, Chunk, CSum},
#state{high_clnt=Clnt}=S) ->
NSInfo = undefined,
io:format(user, "TODO fix broken write_chunk mod ~s line ~w\n", [?MODULE, ?LINE]),
Chunk = {TaggedCSum, ChunkBin},
Res = machi_cr_client:write_chunk(Clnt, NSInfo, File, Offset, Chunk),
Res = machi_cr_client:write_chunk(Clnt, NSInfo, File, Offset, Chunk, CSum),
{Res, S};
do_pb_hl_request2({high_read_chunk, File, Offset, Size, Opts},
#state{high_clnt=Clnt}=S) ->

View file

@ -102,7 +102,7 @@ auth(PidSpec, User, Pass, Timeout) ->
-spec append_chunk(pid(),
NS::machi_dt:namespace(), Prefix::machi_dt:file_prefix(),
Chunk::machi_dt:chunk_bin(), CSum::machi_dt:chunk_csum(),
Chunk::machi_dt:chunk(), CSum::machi_dt:chunk_csum(),
Opts::machi_dt:append_opts()) ->
{ok, Filename::string(), Offset::machi_dt:file_offset()} |
{error, machi_client_error_reason()}.
@ -111,7 +111,7 @@ append_chunk(PidSpec, NS, Prefix, Chunk, CSum, Opts) ->
-spec append_chunk(pid(),
NS::machi_dt:namespace(), Prefix::machi_dt:file_prefix(),
Chunk::machi_dt:chunk_bin(), CSum::machi_dt:chunk_csum(),
Chunk::machi_dt:chunk(), CSum::machi_dt:chunk_csum(),
Opts::machi_dt:append_opts(),
Timeout::non_neg_integer()) ->
{ok, Filename::string(), Offset::machi_dt:file_offset()} |
@ -120,13 +120,13 @@ append_chunk(PidSpec, NS, Prefix, Chunk, CSum, Opts, Timeout) ->
send_sync(PidSpec, {append_chunk, NS, Prefix, Chunk, CSum, Opts}, Timeout).
-spec write_chunk(pid(), File::string(), machi_dt:file_offset(),
Chunk::machi_dt:chunk_bin(), CSum::machi_dt:chunk_csum()) ->
Chunk::machi_dt:chunk(), CSum::machi_dt:chunk_csum()) ->
ok | {error, machi_client_error_reason()}.
write_chunk(PidSpec, File, Offset, Chunk, CSum) ->
write_chunk(PidSpec, File, Offset, Chunk, CSum, ?DEFAULT_TIMEOUT).
-spec write_chunk(pid(), File::string(), machi_dt:file_offset(),
Chunk::machi_dt:chunk_bin(), CSum::machi_dt:chunk_csum(), Timeout::non_neg_integer()) ->
Chunk::machi_dt:chunk(), CSum::machi_dt:chunk_csum(), Timeout::non_neg_integer()) ->
ok | {error, machi_client_error_reason()}.
write_chunk(PidSpec, File, Offset, Chunk, CSum, Timeout) ->
send_sync(PidSpec, {write_chunk, File, Offset, Chunk, CSum}, Timeout).

View file

@ -196,9 +196,9 @@ from_pb_request(#mpb_request{req_id=ReqID,
#mpb_writechunkreq{chunk=#mpb_chunk{file_name=File,
offset=Offset,
chunk=Chunk,
csum=CSum}} = IR,
TaggedCSum = make_tagged_csum(CSum, Chunk),
{ReqID, {high_write_chunk, File, Offset, Chunk, TaggedCSum}};
csum=CSumRec}} = IR,
CSum = make_tagged_csum(CSumRec, Chunk),
{ReqID, {high_write_chunk, File, Offset, Chunk, CSum}};
from_pb_request(#mpb_request{req_id=ReqID,
read_chunk=IR=#mpb_readchunkreq{}}) ->
#mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
@ -732,7 +732,7 @@ to_pb_response(ReqID, {high_append_chunk, _NS, _Prefix, _Chunk, _TSum, _O}, Resp
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb_response(ReqID, {high_write_chunk, _File, _Offset, _Chunk, _TaggedCSum}, Resp) ->
to_pb_response(ReqID, {high_write_chunk, _File, _Offset, _Chunk, _CSum}, Resp) ->
case Resp of
{ok, {_,_,_}} ->
%% machi_cr_client returns ok 2-tuple, convert to simple ok.

View file

@ -81,7 +81,7 @@
quit/1,
%% Internal API
write_chunk/6, write_chunk/7,
write_chunk/7, write_chunk/8,
trim_chunk/6, trim_chunk/7,
%% Helpers
@ -280,14 +280,14 @@ quit(PidSpec) ->
%% @doc Write a chunk (binary- or iolist-style) of data to a file
%% with `Prefix' at `Offset'.
write_chunk(PidSpec, NSInfo, EpochID, File, Offset, Chunk) ->
write_chunk(PidSpec, NSInfo, EpochID, File, Offset, Chunk, infinity).
write_chunk(PidSpec, NSInfo, EpochID, File, Offset, Chunk, CSum) ->
write_chunk(PidSpec, NSInfo, EpochID, File, Offset, Chunk, CSum, infinity).
%% @doc Write a chunk (binary- or iolist-style) of data to a file
%% with `Prefix' at `Offset'.
write_chunk(PidSpec, NSInfo, EpochID, File, Offset, Chunk, Timeout) ->
case gen_server:call(PidSpec, {req, {write_chunk, NSInfo, EpochID, File, Offset, Chunk}},
write_chunk(PidSpec, NSInfo, EpochID, File, Offset, Chunk, CSum, Timeout) ->
case gen_server:call(PidSpec, {req, {write_chunk, NSInfo, EpochID, File, Offset, Chunk, CSum}},
Timeout) of
{error, written}=Err ->
Size = byte_size(Chunk),
@ -384,9 +384,9 @@ make_req_fun({append_chunk, NSInfo, EpochID,
make_req_fun({read_chunk, NSInfo, EpochID, File, Offset, Size, Opts},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:read_chunk(Sock, NSInfo, EpochID, File, Offset, Size, Opts) end;
make_req_fun({write_chunk, NSInfo, EpochID, File, Offset, Chunk},
make_req_fun({write_chunk, NSInfo, EpochID, File, Offset, Chunk, CSum},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:write_chunk(Sock, NSInfo, EpochID, File, Offset, Chunk) end;
fun() -> Mod:write_chunk(Sock, NSInfo, EpochID, File, Offset, Chunk, CSum) end;
make_req_fun({trim_chunk, NSInfo, EpochID, File, Offset, Size},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:trim_chunk(Sock, NSInfo, EpochID, File, Offset, Size) end;

View file

@ -448,6 +448,8 @@ int2bool(I) when is_integer(I) -> true.
read_opts_default(#read_opts{}=NSInfo) ->
NSInfo;
read_opts_default(A) when A == 'undefined'; A == 'noopt'; A == 'none' ->
#read_opts{};
read_opts_default(A) when is_atom(A) ->
#read_opts{}.

View file

@ -484,7 +484,7 @@ eqc_verbose() ->
os:getenv("EQC_VERBOSE") =:= "true".
eqc_timeout(Default) ->
PropTimeout = case os:getenv("EQC_TIMEOUT") of
PropTimeout = case os:getenv("EQC_TIME") of
false -> Default;
V -> list_to_integer(V)
end,

View file

@ -139,7 +139,7 @@ smoke_test2() ->
Host, PortBase+X, NSInfo, EpochID,
File1, FooOff1, Size1, undefined) || X <- [0,1,2] ],
ok = machi_flu1_client:write_chunk(Host, PortBase+0, NSInfo, EpochID,
File1, FooOff1, Chunk1),
File1, FooOff1, Chunk1, NoCSum),
{ok, {[{_, FooOff1, Chunk1, _}], []}} =
machi_flu1_client:read_chunk(Host, PortBase+0, NSInfo, EpochID,
File1, FooOff1, Size1, undefined),
@ -156,7 +156,7 @@ smoke_test2() ->
Chunk2 = <<"Middle repair chunk">>,
Size2 = size(Chunk2),
ok = machi_flu1_client:write_chunk(Host, PortBase+1, NSInfo, EpochID,
File1, FooOff2, Chunk2),
File1, FooOff2, Chunk2, NoCSum),
{ok, {[{_, FooOff2, Chunk2, _}], []}} =
machi_cr_client:read_chunk(C1, NSInfo, File1, FooOff2, Size2, undefined),
[{X,{ok, {[{_, FooOff2, Chunk2, _}], []}}} =
@ -196,7 +196,7 @@ smoke_test2() ->
%% {error,not_written} = machi_cr_client:read_chunk(C1, NSInfo, File10,
%% Offx, Size10),
{ok, {Offx,Size10,File10}} =
machi_cr_client:write_chunk(C1, NSInfo, File10, Offx, Chunk10),
machi_cr_client:write_chunk(C1, NSInfo, File10, Offx, Chunk10, NoCSum),
{ok, {[{_, Offx, Chunk10, _}], []}} =
machi_cr_client:read_chunk(C1, NSInfo, File10, Offx, Size10, undefined)
end || Seq <- lists:seq(1, Extra10)],
@ -286,7 +286,7 @@ witness_smoke_test2() ->
File10 = File1,
Offx = Off1 + (1 * Size10),
{error, partition} =
machi_cr_client:write_chunk(C1, NSInfo, File10, Offx, Chunk10, 1*1000),
machi_cr_client:write_chunk(C1, NSInfo, File10, Offx, Chunk10, NoCSum, 1*1000),
ok
after

View file

@ -35,10 +35,14 @@
%% EUNIT TEST DEFINITION
eqc_test_() ->
{timeout, 60,
PropTimeout = case os:getenv("EQC_TIME") of
false -> 30;
V -> list_to_integer(V)
end,
{timeout, PropTimeout*2 + 30,
{spawn,
[
?_assertEqual(true, eqc:quickcheck(eqc:testing_time(30, ?QC_OUT(prop_ok()))))
?_assertEqual(true, eqc:quickcheck(eqc:testing_time(PropTimeout, ?QC_OUT(prop_ok()))))
]
}}.

View file

@ -161,9 +161,9 @@ flu_smoke_test() ->
Off2 = ?MINIMUM_OFFSET + 77,
File2 = "smoke-whole-file^^0^1^1",
ok = ?FLU_C:write_chunk(Host, TcpPort, NSInfo, ?DUMMY_PV1_EPOCH,
File2, Off2, Chunk2),
File2, Off2, Chunk2, NoCSum),
{error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, NSInfo, ?DUMMY_PV1_EPOCH,
BadFile, Off2, Chunk2),
BadFile, Off2, Chunk2, NoCSum),
{ok, {[{_, Off2, Chunk2, _}], _}} =
?FLU_C:read_chunk(Host, TcpPort, NSInfo, ?DUMMY_PV1_EPOCH, File2, Off2, Len2, noopt),
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,
@ -262,7 +262,7 @@ bad_checksum_test() ->
try
Prefix = <<"some prefix">>,
Chunk1 = <<"yo yo yo">>,
BadCSum = {?CSUM_TAG_CLIENT_SHA, crypto:sha("foo")},
BadCSum = {?CSUM_TAG_CLIENT_SHA, crypto:hash(sha, ".................")},
{error, bad_checksum} = ?FLU_C:append_chunk(Host, TcpPort, NSInfo,
?DUMMY_PV1_EPOCH,
Prefix,

View file

@ -61,24 +61,26 @@ api_smoke_test() ->
{ok, {MyOff,MySize,MyFile}} =
?MUT:append_chunk(Prox1, NSInfo, FakeEpoch, Prefix, MyChunk,
NoCSum),
{ok, {[{_, MyOff, MyChunk, _}], []}} =
{ok, {[{_, MyOff, MyChunk, _MyChunkCSUM}], []}} =
?MUT:read_chunk(Prox1, NSInfo, FakeEpoch, MyFile, MyOff, MySize, undefined),
MyChunk2 = <<"my chunk data, yeah, again">>,
MyChunk2_parts = [<<"my chunk ">>, "data", <<", yeah, again">>],
MyChunk2 = iolist_to_binary(MyChunk2_parts),
Opts1 = #append_opts{chunk_extra=4242},
{ok, {MyOff2,MySize2,MyFile2}} =
?MUT:append_chunk(Prox1, NSInfo, FakeEpoch, Prefix,
MyChunk2, NoCSum, Opts1, infinity),
{ok, {[{_, MyOff2, MyChunk2, _}], []}} =
?MUT:read_chunk(Prox1, NSInfo, FakeEpoch, MyFile2, MyOff2, MySize2, undefined),
BadCSum = {?CSUM_TAG_CLIENT_SHA, crypto:sha("foo")},
MyChunk2_parts, NoCSum, Opts1, infinity),
[{ok, {[{_, MyOff2, MyChunk2, _}], []}} =
?MUT:read_chunk(Prox1, NSInfo, FakeEpoch, MyFile2, MyOff2, MySize2, DefaultOptions) ||
DefaultOptions <- [undefined, noopt, none, any_atom_at_all] ],
BadCSum = {?CSUM_TAG_CLIENT_SHA, crypto:hash(sha, "...................")},
{error, bad_checksum} = ?MUT:append_chunk(Prox1, NSInfo, FakeEpoch,
Prefix, MyChunk, BadCSum),
Opts2 = #append_opts{chunk_extra=99832},
io:format(user, "\nTODO: fix write_chunk() call below @ ~s LINE ~w\n", [?MODULE,?LINE]),
%% {error, bad_checksum} = ?MUT:write_chunk(Prox1, NSInfo, FakeEpoch,
%% <<"foo-file^^0^1^1">>,
%% MyChunk, BadCSum,
%% Opts2, infinity),
{error, bad_checksum} = ?MUT:write_chunk(Prox1, NSInfo, FakeEpoch,
MyFile2,
MyOff2 + size(MyChunk2),
MyChunk, BadCSum,
infinity),
%% Put kick_projection_reaction() in the middle of the test so
%% that any problems with its async nature will (hopefully)
@ -283,20 +285,20 @@ flu_restart_test2() ->
fun(run) ->
ok =
?MUT:write_chunk(Prox1, NSInfo, FakeEpoch, File1, Off1,
Data, infinity),
Data, NoCSum, infinity),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:write_chunk(Prox1, NSInfo, FakeEpoch, File1, Off1,
Data, infinity)
Data, NoCSum, infinity)
end,
fun(run) ->
{error, written} =
?MUT:write_chunk(Prox1, NSInfo, FakeEpoch, File1, Off1,
Dataxx, infinity),
Dataxx, NoCSum, infinity),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:write_chunk(Prox1, NSInfo, FakeEpoch, File1, Off1,
Dataxx, infinity)
Dataxx, NoCSum, infinity)
end
],