Merge pull request #18 from basho/ku/read-all-valid-chunks

Allow read_chunk() to return partial chunks
This commit is contained in:
Scott Lystig Fritchie 2015-10-19 16:25:43 +09:00
commit ecd7eb195a
15 changed files with 197 additions and 129 deletions

View file

@ -9,7 +9,7 @@
{protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", {tag, "0.8.1p4"}}},
{riak_dt, ".*", {git, "git://github.com/basho/riak_dt.git", {branch, "develop"}}},
{node_package, ".*", {git, "git://github.com/basho/node_package.git", {branch, "develop"}}},
{eper, ".*", {git, "git://github.com/basho/eper.git", {tag, "0.78"}}}
{eper, ".*", {git, "git://github.com/basho/eper.git", {tag, "0.92-basho1"}}}
]}.
{sub_dirs, ["rel", "apps/machi"]}.

View file

@ -86,6 +86,14 @@ message Mpb_ChunkCSum {
optional bytes csum = 2;
}
message Mpb_Chunk {
required uint64 offset = 1;
required string file_name = 2;
required bytes chunk = 3;
// TODO: must be required, in future?
optional Mpb_ChunkCSum csum = 4;
}
// epoch_id() type
message Mpb_EpochID {
required uint32 epoch_number = 1;
@ -177,10 +185,7 @@ message Mpb_AppendChunkResp {
// High level API: write_chunk() request & response
message Mpb_WriteChunkReq {
required string file = 1;
required uint64 offset = 2;
required bytes chunk = 3;
required Mpb_ChunkCSum csum = 4;
required Mpb_Chunk chunk = 1;
}
message Mpb_WriteChunkResp {
@ -190,33 +195,27 @@ message Mpb_WriteChunkResp {
// High level API: read_chunk() request & response
message Mpb_ReadChunkReq {
required string file = 1;
required uint64 offset = 2;
required uint32 size = 3;
required Mpb_ChunkPos chunk_pos = 1;
// Use flag_no_checksum=non-zero to skip returning the chunk's checksum.
// TODO: not implemented yet.
optional uint32 flag_no_checksum = 4 [default=0];
optional uint32 flag_no_checksum = 2 [default=0];
// Use flag_no_chunk=non-zero to skip returning the chunk (which
// only makes sense if flag_no_checksum is not set).
// TODO: not implemented yet.
optional uint32 flag_no_chunk = 5 [default=0];
optional uint32 flag_no_chunk = 3 [default=0];
}
message Mpb_ReadChunkResp {
required Mpb_GeneralStatusCode status = 1;
optional bytes chunk = 2;
optional Mpb_ChunkCSum csum = 3;
repeated Mpb_Chunk chunks = 2;
}
// High level API: trim_chunk() request & response
message Mpb_TrimChunkReq {
required string file = 1;
required uint64 offset = 2;
required uint32 size = 3;
required Mpb_ChunkPos chunk_pos = 1;
}
message Mpb_TrimChunkResp {
@ -390,10 +389,7 @@ message Mpb_LL_AppendChunkResp {
message Mpb_LL_WriteChunkReq {
required Mpb_EpochID epoch_id = 1;
required string file = 2;
required uint64 offset = 3;
required bytes chunk = 4;
required Mpb_ChunkCSum csum = 5;
required Mpb_Chunk chunk = 2;
}
message Mpb_LL_WriteChunkResp {
@ -404,24 +400,21 @@ message Mpb_LL_WriteChunkResp {
message Mpb_LL_ReadChunkReq {
required Mpb_EpochID epoch_id = 1;
required string file = 2;
required uint64 offset = 3;
required uint32 size = 4;
required Mpb_ChunkPos chunk_pos = 2;
// Use flag_no_checksum=non-zero to skip returning the chunk's checksum.
// TODO: not implemented yet.
optional uint32 flag_no_checksum = 5 [default=0];
optional uint32 flag_no_checksum = 3 [default=0];
// Use flag_no_chunk=non-zero to skip returning the chunk (which
// only makes sense if flag_checksum is not set).
// TODO: not implemented yet.
optional uint32 flag_no_chunk = 6 [default=0];
optional uint32 flag_no_chunk = 4 [default=0];
}
message Mpb_LL_ReadChunkResp {
required Mpb_GeneralStatusCode status = 1;
optional bytes chunk = 2;
optional Mpb_ChunkCSum csum = 3;
repeated Mpb_Chunk chunks = 2;
}
// Low level API: checksum_list()

View file

@ -73,8 +73,13 @@ verify_file_checksums_local2(Sock1, EpochID, Path0) ->
{ok, FH} ->
File = re:replace(Path, ".*/", "", [{return, binary}]),
try
ReadChunk = fun(_File, Offset, Size) ->
file:pread(FH, Offset, Size)
ReadChunk = fun(F, Offset, Size) ->
case file:pread(FH, Offset, Size) of
{ok, Bin} ->
{ok, [{F, Offset, Bin, undefined}]};
Err ->
Err
end
end,
verify_file_checksums_common(Sock1, EpochID, File, ReadChunk)
after
@ -112,7 +117,7 @@ verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) ->
verify_chunk_checksum(File, ReadChunk) ->
fun({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) ->
case ReadChunk(File, Offset, Size) of
{ok, Chunk} ->
{ok, [{_, Offset, Chunk, _}]} ->
CSum2 = machi_util:checksum_chunk(Chunk),
if CSum == CSum2 ->
Acc;

View file

@ -323,9 +323,10 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) ->
_ -> ok
end,
_T1 = os:timestamp(),
{ok, Chunk} = machi_proxy_flu1_client:read_chunk(
SrcP, EpochID, File, Offset, Size,
?SHORT_TIMEOUT),
{ok, [{_, Offset, Chunk, _}]} =
machi_proxy_flu1_client:read_chunk(
SrcP, EpochID, File, Offset, Size,
?SHORT_TIMEOUT),
_T2 = os:timestamp(),
<<_Tag:1/binary, CSum/binary>> = TaggedCSum,
case machi_util:checksum_chunk(Chunk) of

View file

@ -529,9 +529,12 @@ do_read_chunk2(File, Offset, Size, Depth, STime, TO,
ConsistencyMode = P#projection_v1.mode,
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
File, Offset, Size, ?TIMEOUT) of
{ok, Chunk} when byte_size(Chunk) == Size ->
{reply, {ok, Chunk}, S};
{ok, [{_, _, Chunk, _Csum}] = Chunks} when byte_size(Chunk) == Size ->
{reply, {ok, Chunks}, S};
{ok, Chunks} when is_list(Chunks) ->
{reply, {ok, Chunks}, S};
{ok, BadChunk} ->
%% TODO cleaner handling of bad chunks
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size,
got, byte_size(BadChunk)});
{error, bad_arg} = BadArg ->
@ -596,7 +599,7 @@ read_repair2(cp_mode=ConsistencyMode,
Tail = lists:last(readonly_flus(P)),
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
File, Offset, Size, ?TIMEOUT) of
{ok, Chunk} when byte_size(Chunk) == Size ->
{ok, [{_, Offset, Chunk, _}]} when byte_size(Chunk) == Size ->
ToRepair = mutation_flus(P) -- [Tail],
read_repair3(ToRepair, ReturnMode, Chunk, [Tail], File, Offset,
Size, Depth, STime, S);
@ -620,10 +623,23 @@ read_repair2(ap_mode=ConsistencyMode,
#state{proj=P}=S) ->
Eligible = mutation_flus(P),
case try_to_find_chunk(Eligible, File, Offset, Size, S) of
{ok, Chunk, GotItFrom} when byte_size(Chunk) == Size ->
{ok, [{File0,Offset0,Chunk0,Csum}], GotItFrom} when byte_size(Chunk0) == Size ->
ToRepair = mutation_flus(P) -- [GotItFrom],
read_repair3(ToRepair, ReturnMode, Chunk, [GotItFrom], File,
Offset, Size, Depth, STime, S);
%% TODO: stop matching single-size list
%% {RepairedChunks, S2} =
%% lists:foldl(fun({_, Offset0, Chunk0, Csum}, {Chunks0, S0}) ->
%% Size0 = byte_size(Chunk0),
%% {reply, {ok, Chunk1}, S1} =
%% read_repair3(ToRepair, ReturnMode, Chunk0, [GotItFrom], File,
%% Offset0, Size0, Depth, STime, S0),
%% {[{File, Offset0, Chunk1, Csum}|Chunks0], S1}
%% end,
%% {[], S}, Chunks),
%% {reply, {ok, RepairedChunks}, S2};
{reply, {ok, RepairedChunk}, S2}
= read_repair3(ToRepair, ReturnMode, Chunk0, [GotItFrom], File0,
Offset0, Size, Depth, STime, S),
{reply, {ok, [{File0, Offset0, RepairedChunk, Csum}]}, S2};
{ok, BadChunk, _GotItFrom} ->
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File,
Offset, Size, got, byte_size(BadChunk)});
@ -856,14 +872,14 @@ try_to_find_chunk(Eligible, File, Offset, Size,
Proxy = orddict:fetch(FLU, PD),
case ?FLU_PC:read_chunk(Proxy, EpochID,
File, Offset, Size) of
{ok, Chunk} when byte_size(Chunk) == Size ->
{FLU, {ok, Chunk}};
{ok, [{_, Offset, Chunk, _}] = Chunks} when byte_size(Chunk) == Size ->
{FLU, {ok, Chunks}};
Else ->
{FLU, Else}
end
end,
Rs = run_middleworker_job(Work, Eligible, Timeout),
case [X || {_, {ok, B}}=X <- Rs, is_binary(B)] of
case [X || {_, {ok, [{_,_,B,_}]}}=X <- Rs, is_binary(B)] of
[{FoundFLU, {ok, Chunk}}|_] ->
{ok, Chunk, FoundFLU};
[] ->

View file

@ -132,8 +132,10 @@ sync(_Pid, Type) ->
% @doc Read file at offset for length
-spec read(Pid :: pid(),
Offset :: non_neg_integer(),
Length :: non_neg_integer()) -> {ok, Data :: binary(), Checksum :: binary()} |
{error, Reason :: term()}.
Length :: non_neg_integer()) ->
{ok, [{Filename::string(), Offset :: non_neg_integer(),
Data :: binary(), Checksum :: binary()}]} |
{error, Reason :: term()}.
read(Pid, Offset, Length) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
andalso is_integer(Length) andalso Length > 0 ->
gen_server:call(Pid, {read, Offset, Length}, ?TIMEOUT);
@ -279,14 +281,19 @@ handle_call({read, Offset, Length}, _From,
undefined
end,
{Resp, NewErr} = case handle_read(FH, F, Checksum, Offset, Length, U) of
{ok, Bytes, Csum} ->
{{ok, Bytes, Csum}, Err};
eof ->
{{error, not_written}, Err + 1};
Error ->
{Error, Err + 1}
end,
{Resp, NewErr} =
case handle_read(FH, F, Checksum, Offset, Length, U) of
{ok, Chunks} ->
%% Kludge to wrap read result in tuples, to support fragmented read
%% XXX FIXME
%% For now we are omiting the checksum data because it blows up
%% protobufs.
{{ok, Chunks}, Err};
eof ->
{{error, not_written}, Err + 1};
Error ->
{Error, Err + 1}
end,
{reply, Resp, State#state{reads = {T+1, NewErr}}};
%%% WRITES
@ -542,7 +549,7 @@ handle_read(FHd, Filename, TaggedCsum, Offset, Size, U) ->
do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
case file:pread(FHd, Offset, Size) of
eof ->
eof ->
eof;
{ok, Bytes} when byte_size(Bytes) == Size ->
{Tag, Ck} = machi_util:unmake_tagged_csum(TaggedCsum),
@ -552,11 +559,11 @@ do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
[Bad, Ck]),
{error, bad_checksum};
TaggedCsum ->
{ok, Bytes, TaggedCsum};
%% XXX FIXME: Should we return something other than
%% {ok, ....} in this case?
{ok, [{Filename, Offset, Bytes, TaggedCsum}]};
OtherCsum when Tag =:= ?CSUM_TAG_NONE ->
{ok, Bytes, OtherCsum}
%% XXX FIXME: Should we return something other than
%% {ok, ....} in this case?
{ok, [{Filename, Offset, Bytes, OtherCsum}]}
end;
{ok, Partial} ->
lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p",
@ -608,7 +615,7 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data, U) ->
lager:warning("This should never happen: got eof while reading at offset ~p in file ~p that's supposedly written",
[Offset, Filename]),
{error, server_insanity};
{ok, _, _} ->
{ok, _} ->
{ok, U};
_ ->
{error, written}

View file

@ -534,10 +534,7 @@ do_server_read_chunk(File, Offset, Size, _Opts, #state{flu_name=FluName})->
ok ->
{ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}),
case machi_file_proxy:read(Pid, Offset, Size) of
%% XXX FIXME
%% For now we are omiting the checksum data because it blows up
%% protobufs.
{ok, Data, _Csum} -> {ok, Data};
{ok, Chunks} -> {ok, Chunks};
Other -> Other
end;
_ ->

View file

@ -260,10 +260,11 @@ do_send_sync2({write_chunk, File, Offset, Chunk, CSum},
try
ReqID = <<Index:64/big, Count:64/big>>,
CSumT = convert_csum_req(CSum, Chunk),
Req = #mpb_writechunkreq{file=File,
offset=Offset,
chunk=Chunk,
csum=CSumT},
Req = #mpb_writechunkreq{chunk=
#mpb_chunk{chunk=Chunk,
file_name=File,
offset=Offset,
csum=CSumT}},
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
write_chunk=Req},
Bin1a = machi_pb:encode_mpb_request(R1a),
@ -285,9 +286,9 @@ do_send_sync2({read_chunk, File, Offset, Size},
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
try
ReqID = <<Index:64/big, Count:64/big>>,
Req = #mpb_readchunkreq{file=File,
offset=Offset,
size=Size},
Req = #mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
offset=Offset,
chunk_size=Size}},
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
read_chunk=Req},
Bin1a = machi_pb:encode_mpb_request(R1a),
@ -309,9 +310,9 @@ do_send_sync2({trim_chunk, File, Offset, Size},
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
try
ReqID = <<Index:64/big, Count:64/big>>,
Req = #mpb_trimchunkreq{file=File,
offset=Offset,
size=Size},
Req = #mpb_trimchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
offset=Offset,
chunk_size=Size}},
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
trim_chunk=Req},
Bin1a = machi_pb:encode_mpb_request(R1a),
@ -415,8 +416,16 @@ convert_write_chunk_resp(#mpb_writechunkresp{status='OK'}) ->
convert_write_chunk_resp(#mpb_writechunkresp{status=Status}) ->
convert_general_status_code(Status).
convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunk=Chunk}) ->
{ok, Chunk};
convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunks=PB_Chunks}) ->
Chunks = lists:map(fun(#mpb_chunk{offset=Offset,
file_name=File,
chunk=Chunk,
csum=#mpb_chunkcsum{type=T, csum=Ck}}) ->
%% TODO: cleanup export
Csum = <<(machi_pb_translate:conv_to_csum_tag(T)):8, Ck/binary>>,
{File, Offset, Chunk, Csum}
end, PB_Chunks),
{ok, Chunks};
convert_read_chunk_resp(#mpb_readchunkresp{status=Status}) ->
convert_general_status_code(Status).

View file

@ -15,8 +15,8 @@
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%%
-module(machi_pb_translate).
@ -37,6 +37,9 @@
to_pb_response/3
]).
%% TODO: fixme cleanup
-export([conv_to_csum_tag/1]).
from_pb_request(#mpb_ll_request{
req_id=ReqID,
echo=#mpb_echoreq{message=Msg}}) ->
@ -62,10 +65,10 @@ from_pb_request(#mpb_ll_request{
req_id=ReqID,
write_chunk=#mpb_ll_writechunkreq{
epoch_id=PB_EpochID,
file=File,
offset=Offset,
chunk=Chunk,
csum=#mpb_chunkcsum{type=CSum_type, csum=CSum}}}) ->
chunk=#mpb_chunk{file_name=File,
offset=Offset,
chunk=Chunk,
csum=#mpb_chunkcsum{type=CSum_type, csum=CSum}}}}) ->
EpochID = conv_to_epoch_id(PB_EpochID),
CSum_tag = conv_to_csum_tag(CSum_type),
{ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, CSum}};
@ -73,14 +76,15 @@ from_pb_request(#mpb_ll_request{
req_id=ReqID,
read_chunk=#mpb_ll_readchunkreq{
epoch_id=PB_EpochID,
file=File,
offset=Offset,
size=Size,
chunk_pos=ChunkPos,
flag_no_checksum=PB_GetNoChecksum,
flag_no_chunk=PB_GetNoChunk}}) ->
EpochID = conv_to_epoch_id(PB_EpochID),
Opts = [{no_checksum, conv_to_boolean(PB_GetNoChecksum)},
{no_chunk, conv_to_boolean(PB_GetNoChunk)}],
#mpb_chunkpos{file_name=File,
offset=Offset,
chunk_size=Size} = ChunkPos,
{ReqID, {low_read_chunk, EpochID, File, Offset, Size, Opts}};
from_pb_request(#mpb_ll_request{
req_id=ReqID,
@ -163,23 +167,23 @@ from_pb_request(#mpb_request{req_id=ReqID,
ChunkExtra}};
from_pb_request(#mpb_request{req_id=ReqID,
write_chunk=IR=#mpb_writechunkreq{}}) ->
#mpb_writechunkreq{file=File,
offset=Offset,
chunk=Chunk,
csum=CSum} = IR,
#mpb_writechunkreq{chunk=#mpb_chunk{file_name=File,
offset=Offset,
chunk=Chunk,
csum=CSum}} = IR,
TaggedCSum = make_tagged_csum(CSum, Chunk),
{ReqID, {high_write_chunk, File, Offset, Chunk, TaggedCSum}};
from_pb_request(#mpb_request{req_id=ReqID,
read_chunk=IR=#mpb_readchunkreq{}}) ->
#mpb_readchunkreq{file=File,
offset=Offset,
size=Size} = IR,
#mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
offset=Offset,
chunk_size=Size}} = IR,
{ReqID, {high_read_chunk, File, Offset, Size}};
from_pb_request(#mpb_request{req_id=ReqID,
trim_chunk=IR=#mpb_trimchunkreq{}}) ->
#mpb_trimchunkreq{file=File,
offset=Offset,
size=Size} = IR,
#mpb_trimchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
offset=Offset,
chunk_size=Size}} = IR,
{ReqID, {high_trim_chunk, File, Offset, Size}};
from_pb_request(#mpb_request{req_id=ReqID,
checksum_list=IR=#mpb_checksumlistreq{}}) ->
@ -229,10 +233,17 @@ from_pb_response(#mpb_ll_response{
from_pb_response(#mpb_ll_response{
req_id=ReqID,
read_chunk=#mpb_ll_readchunkresp{status=Status,
chunk=Chunk}}) ->
chunks=PB_Chunks}}) ->
case Status of
'OK' ->
{ReqID, {ok, Chunk}};
Chunks = lists:map(fun(#mpb_chunk{file_name=File,
offset=Offset,
chunk=Bytes,
csum=#mpb_chunkcsum{type=T,csum=Ck}}) ->
Csum = <<(conv_to_csum_tag(T)):8, Ck/binary>>,
{File, Offset, Bytes, Csum}
end, PB_Chunks),
{ReqID, {ok, Chunks}};
_ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end;
@ -367,10 +378,10 @@ to_pb_request(ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, C
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
write_chunk=#mpb_ll_writechunkreq{
epoch_id=PB_EpochID,
file=File,
offset=Offset,
chunk=Chunk,
csum=PB_CSum}};
chunk=#mpb_chunk{file_name=File,
offset=Offset,
chunk=Chunk,
csum=PB_CSum}}};
to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, _Opts}) ->
%% TODO: stop ignoring Opts ^_^
PB_EpochID = conv_from_epoch_id(EpochID),
@ -378,9 +389,10 @@ to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, _Opts}) ->
req_id=ReqID, do_not_alter=2,
read_chunk=#mpb_ll_readchunkreq{
epoch_id=PB_EpochID,
file=File,
offset=Offset,
size=Size}};
chunk_pos=#mpb_chunkpos{
file_name=File,
offset=Offset,
chunk_size=Size}}};
to_pb_request(ReqID, {low_checksum_list, EpochID, File}) ->
PB_EpochID = conv_from_epoch_id(EpochID),
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
@ -466,12 +478,18 @@ to_pb_response(ReqID, {low_write_chunk, _EID, _Fl, _Off, _Ch, _CST, _CS},Resp)->
write_chunk=#mpb_ll_writechunkresp{status=Status}};
to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)->
case Resp of
{ok, Chunk} ->
CSum = undefined, % TODO not implemented
{ok, Chunks} ->
PB_Chunks = lists:map(fun({File, Offset, Bytes, Csum}) ->
{Tag, Ck} = machi_util:unmake_tagged_csum(Csum),
#mpb_chunk{file_name=File,
offset=Offset,
chunk=Bytes,
csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag),
csum=Ck}}
end, Chunks),
#mpb_ll_response{req_id=ReqID,
read_chunk=#mpb_ll_readchunkresp{status='OK',
chunk=Chunk,
csum=CSum}};
chunks=PB_Chunks}};
{error, _}=Error ->
Status = conv_from_status(Error),
#mpb_ll_response{req_id=ReqID,
@ -638,10 +656,19 @@ to_pb_response(ReqID, {high_write_chunk, _File, _Offset, _Chunk, _TaggedCSum}, R
end;
to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) ->
case Resp of
{ok, Chunk} ->
{ok, Chunks} ->
MpbChunks = lists:map(fun({File, Offset, Bytes, Csum}) ->
{Tag, Ck} = machi_util:unmake_tagged_csum(Csum),
io:format(user, "~p oom~n", [Csum]),
#mpb_chunk{
offset=Offset,
file_name=File,
chunk=Bytes,
csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag), csum=Ck}}
end, Chunks),
#mpb_response{req_id=ReqID,
read_chunk=#mpb_readchunkresp{status='OK',
chunk=Chunk}};
chunks=MpbChunks}};
{error, _}=Error ->
Status = conv_from_status(Error),
#mpb_response{req_id=ReqID,

View file

@ -113,12 +113,13 @@ smoke_test2() ->
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1},
{error, bad_checksum} =
machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs),
{ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
{ok, [{_, Off1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
{ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0,
private),
%% Verify that the client's CR wrote to all of them.
[{ok, Chunk1} = machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID, File1, Off1, Size1) ||
[{ok, [{_, Off1, Chunk1, _}]} =
machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID, File1, Off1, Size1) ||
X <- [0,1,2] ],
%% Test read repair: Manually write to head, then verify that
@ -129,8 +130,8 @@ smoke_test2() ->
File1, FooOff1, Size1) || X <- [0,1,2] ],
ok = machi_flu1_client:write_chunk(Host, PortBase+0, EpochID,
File1, FooOff1, Chunk1),
{ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, FooOff1, Size1),
[{X,{ok, Chunk1}} = {X,machi_flu1_client:read_chunk(
{ok, [{_, FooOff1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, FooOff1, Size1),
[{X,{ok, [{_, FooOff1, Chunk1, _}]}} = {X,machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID,
File1, FooOff1, Size1)} || X <- [0,1,2] ],
@ -140,8 +141,8 @@ smoke_test2() ->
Size2 = size(Chunk2),
ok = machi_flu1_client:write_chunk(Host, PortBase+1, EpochID,
File1, FooOff2, Chunk2),
{ok, Chunk2} = machi_cr_client:read_chunk(C1, File1, FooOff2, Size2),
[{X,{ok, Chunk2}} = {X,machi_flu1_client:read_chunk(
{ok, [{_, FooOff2, Chunk2, _}]} = machi_cr_client:read_chunk(C1, File1, FooOff2, Size2),
[{X,{ok, [{_, FooOff2, Chunk2, _}]}} = {X,machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID,
File1, FooOff2, Size2)} || X <- [0,1,2] ],
@ -165,7 +166,8 @@ smoke_test2() ->
{ok, {Off10,Size10,File10}} =
machi_cr_client:append_chunk_extra(C1, Prefix, Chunk10,
Extra10 * Size10),
{ok, Chunk10} = machi_cr_client:read_chunk(C1, File10, Off10, Size10),
{ok, [{_, Off10, Chunk10, _}]} =
machi_cr_client:read_chunk(C1, File10, Off10, Size10),
[begin
Offx = Off10 + (Seq * Size10),
%% TODO: uncomment written/not_written enforcement is available.
@ -173,8 +175,8 @@ smoke_test2() ->
%% Offx, Size10),
{ok, {Offx,Size10,File10}} =
machi_cr_client:write_chunk(C1, File10, Offx, Chunk10),
{ok, Chunk10} = machi_cr_client:read_chunk(C1, File10, Offx,
Size10)
{ok, [{_, Offx, Chunk10, _}]} =
machi_cr_client:read_chunk(C1, File10, Offx, Size10)
end || Seq <- lists:seq(1, Extra10)],
{ok, {Off11,Size11,File11}} =
machi_cr_client:append_chunk(C1, Prefix, Chunk10),
@ -196,7 +198,8 @@ witness_smoke_test2() ->
{error, {already_started, P1}} -> P1;
Other -> error(Other)
end,
error_logger:tty(false),
%% TODO: I wonder why commenting this out makes this test pass
%% error_logger:tty(true),
try
Prefix = <<"pre">>,
Chunk1 = <<"yochunk">>,
@ -215,7 +218,7 @@ witness_smoke_test2() ->
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1},
{error, bad_checksum} =
machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs),
{ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
{ok, [{_, Off1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
%% Stop 'b' and let the chain reset.
ok = machi_flu_psup:stop_flu_package(b),
@ -241,7 +244,8 @@ witness_smoke_test2() ->
end,
%% Chunk1 is still readable: not affected by wedged witness head.
{ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
{ok, [{_, Off1, Chunk1, _}]} =
machi_cr_client:read_chunk(C1, File1, Off1, Size1),
%% But because the head is wedged, an append will fail.
{error, partition} =
machi_cr_client:append_chunk(C1, Prefix, Chunk1, 1*1000),

View file

@ -201,7 +201,12 @@ read_post(S, [_Pid, Off, L], Res) ->
read_next(S, _Res, _Args) -> S.
read(Pid, Offset, Length) ->
machi_file_proxy:read(Pid, Offset, Length).
case machi_file_proxy:read(Pid, Offset, Length) of
{ok, [{_, Offset, Data, Csum}]} ->
{ok, Data, Csum};
E ->
E
end.
%% write

View file

@ -92,7 +92,7 @@ machi_file_proxy_test_() ->
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)),
?_assertMatch({ok, _, _}, machi_file_proxy:read(Pid, 1025, 1000)),
?_assertMatch({ok, [{_, _, _, _}]}, machi_file_proxy:read(Pid, 1025, 1000)),
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)),
?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))),
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))

View file

@ -97,8 +97,8 @@ flu_smoke_test() ->
{ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
Prefix, Chunk1),
{ok, Chunk1} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
File1, Off1, Len1),
{ok, [{_, Off1, Chunk1, _}]} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
File1, Off1, Len1),
{ok, KludgeBin} = ?FLU_C:checksum_list(Host, TcpPort,
?DUMMY_PV1_EPOCH, File1),
true = is_binary(KludgeBin),
@ -151,7 +151,7 @@ flu_smoke_test() ->
File2, Off2, Chunk2),
{error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
BadFile, Off2, Chunk2),
{ok, Chunk2} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
{ok, [{_, Off2, Chunk2, _}]} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
File2, Off2, Len2),
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,

View file

@ -80,7 +80,9 @@ smoke_test2() ->
{iolist_to_binary(Chunk2), File2, Off2, Size2},
{iolist_to_binary(Chunk3), File3, Off3, Size3}],
[begin
{ok, Ch} = ?C:read_chunk(Clnt, Fl, Off, Sz)
File = binary_to_list(Fl),
?assertMatch({ok, [{File, Off, Ch, _}]},
?C:read_chunk(Clnt, Fl, Off, Sz))
end || {Ch, Fl, Off, Sz} <- Reads],
{ok, KludgeBin} = ?C:checksum_list(Clnt, File1),

View file

@ -60,12 +60,14 @@ api_smoke_test() ->
{ok, {MyOff,MySize,MyFile}} =
?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk,
infinity),
{ok, MyChunk} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize),
{ok, [{_, MyOff, MyChunk, _}]} =
?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize),
MyChunk2 = <<"my chunk data, yeah, again">>,
{ok, {MyOff2,MySize2,MyFile2}} =
?MUT:append_chunk_extra(Prox1, FakeEpoch, Prefix,
MyChunk2, 4242, infinity),
{ok, MyChunk2} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2),
{ok, [{_, MyOff2, MyChunk2, _}]} =
?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2),
MyChunk_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, MyChunk},
{error, bad_checksum} = ?MUT:append_chunk(Prox1, FakeEpoch,
Prefix, MyChunk_badcs),
@ -240,7 +242,7 @@ flu_restart_test() ->
(stop) -> ?MUT:append_chunk_extra(Prox1, FakeEpoch,
<<"prefix">>, Data, 42, infinity)
end,
fun(run) -> {ok, Data} =
fun(run) -> {ok, [{_, Off1, Data, _}]} =
?MUT:read_chunk(Prox1, FakeEpoch,
File1, Off1, Size1),
ok;