Allow read_chunk() to return partial chunks

This is simply a change of read_chunk() protocol, where a response of
read_chunk() becomes list of written bytes along with checksum. All
related code including repair is changed as such. This is to pass all
tests and not actually supporting partial chunks.
This commit is contained in:
UENISHI Kota 2015-10-19 12:09:39 +09:00
parent 6961930b0f
commit 3e975f53b8
14 changed files with 196 additions and 128 deletions

View file

@ -86,6 +86,14 @@ message Mpb_ChunkCSum {
optional bytes csum = 2; optional bytes csum = 2;
} }
message Mpb_Chunk {
required uint64 offset = 1;
required string file_name = 2;
required bytes chunk = 3;
// TODO: must be required, in future?
optional Mpb_ChunkCSum csum = 4;
}
// epoch_id() type // epoch_id() type
message Mpb_EpochID { message Mpb_EpochID {
required uint32 epoch_number = 1; required uint32 epoch_number = 1;
@ -177,10 +185,7 @@ message Mpb_AppendChunkResp {
// High level API: write_chunk() request & response // High level API: write_chunk() request & response
message Mpb_WriteChunkReq { message Mpb_WriteChunkReq {
required string file = 1; required Mpb_Chunk chunk = 1;
required uint64 offset = 2;
required bytes chunk = 3;
required Mpb_ChunkCSum csum = 4;
} }
message Mpb_WriteChunkResp { message Mpb_WriteChunkResp {
@ -190,33 +195,27 @@ message Mpb_WriteChunkResp {
// High level API: read_chunk() request & response // High level API: read_chunk() request & response
message Mpb_ReadChunkReq { message Mpb_ReadChunkReq {
required string file = 1; required Mpb_ChunkPos chunk_pos = 1;
required uint64 offset = 2;
required uint32 size = 3;
// Use flag_no_checksum=non-zero to skip returning the chunk's checksum. // Use flag_no_checksum=non-zero to skip returning the chunk's checksum.
// TODO: not implemented yet. // TODO: not implemented yet.
optional uint32 flag_no_checksum = 4 [default=0]; optional uint32 flag_no_checksum = 2 [default=0];
// Use flag_no_chunk=non-zero to skip returning the chunk (which // Use flag_no_chunk=non-zero to skip returning the chunk (which
// only makes sense if flag_no_checksum is not set). // only makes sense if flag_no_checksum is not set).
// TODO: not implemented yet. // TODO: not implemented yet.
optional uint32 flag_no_chunk = 5 [default=0]; optional uint32 flag_no_chunk = 3 [default=0];
} }
message Mpb_ReadChunkResp { message Mpb_ReadChunkResp {
required Mpb_GeneralStatusCode status = 1; required Mpb_GeneralStatusCode status = 1;
optional bytes chunk = 2; repeated Mpb_Chunk chunks = 2;
optional Mpb_ChunkCSum csum = 3;
} }
// High level API: trim_chunk() request & response // High level API: trim_chunk() request & response
message Mpb_TrimChunkReq { message Mpb_TrimChunkReq {
required string file = 1; required Mpb_ChunkPos chunk_pos = 1;
required uint64 offset = 2;
required uint32 size = 3;
} }
message Mpb_TrimChunkResp { message Mpb_TrimChunkResp {
@ -390,10 +389,7 @@ message Mpb_LL_AppendChunkResp {
message Mpb_LL_WriteChunkReq { message Mpb_LL_WriteChunkReq {
required Mpb_EpochID epoch_id = 1; required Mpb_EpochID epoch_id = 1;
required string file = 2; required Mpb_Chunk chunk = 2;
required uint64 offset = 3;
required bytes chunk = 4;
required Mpb_ChunkCSum csum = 5;
} }
message Mpb_LL_WriteChunkResp { message Mpb_LL_WriteChunkResp {
@ -404,24 +400,21 @@ message Mpb_LL_WriteChunkResp {
message Mpb_LL_ReadChunkReq { message Mpb_LL_ReadChunkReq {
required Mpb_EpochID epoch_id = 1; required Mpb_EpochID epoch_id = 1;
required string file = 2; required Mpb_ChunkPos chunk_pos = 2;
required uint64 offset = 3;
required uint32 size = 4;
// Use flag_no_checksum=non-zero to skip returning the chunk's checksum. // Use flag_no_checksum=non-zero to skip returning the chunk's checksum.
// TODO: not implemented yet. // TODO: not implemented yet.
optional uint32 flag_no_checksum = 5 [default=0]; optional uint32 flag_no_checksum = 3 [default=0];
// Use flag_no_chunk=non-zero to skip returning the chunk (which // Use flag_no_chunk=non-zero to skip returning the chunk (which
// only makes sense if flag_checksum is not set). // only makes sense if flag_checksum is not set).
// TODO: not implemented yet. // TODO: not implemented yet.
optional uint32 flag_no_chunk = 6 [default=0]; optional uint32 flag_no_chunk = 4 [default=0];
} }
message Mpb_LL_ReadChunkResp { message Mpb_LL_ReadChunkResp {
required Mpb_GeneralStatusCode status = 1; required Mpb_GeneralStatusCode status = 1;
optional bytes chunk = 2; repeated Mpb_Chunk chunks = 2;
optional Mpb_ChunkCSum csum = 3;
} }
// Low level API: checksum_list() // Low level API: checksum_list()

View file

@ -73,8 +73,13 @@ verify_file_checksums_local2(Sock1, EpochID, Path0) ->
{ok, FH} -> {ok, FH} ->
File = re:replace(Path, ".*/", "", [{return, binary}]), File = re:replace(Path, ".*/", "", [{return, binary}]),
try try
ReadChunk = fun(_File, Offset, Size) -> ReadChunk = fun(F, Offset, Size) ->
file:pread(FH, Offset, Size) case file:pread(FH, Offset, Size) of
{ok, Bin} ->
{ok, [{F, Offset, Bin, undefined}]};
Err ->
Err
end
end, end,
verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) verify_file_checksums_common(Sock1, EpochID, File, ReadChunk)
after after
@ -112,7 +117,7 @@ verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) ->
verify_chunk_checksum(File, ReadChunk) -> verify_chunk_checksum(File, ReadChunk) ->
fun({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) -> fun({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) ->
case ReadChunk(File, Offset, Size) of case ReadChunk(File, Offset, Size) of
{ok, Chunk} -> {ok, [{_, Offset, Chunk, _}]} ->
CSum2 = machi_util:checksum_chunk(Chunk), CSum2 = machi_util:checksum_chunk(Chunk),
if CSum == CSum2 -> if CSum == CSum2 ->
Acc; Acc;

View file

@ -323,7 +323,8 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) ->
_ -> ok _ -> ok
end, end,
_T1 = os:timestamp(), _T1 = os:timestamp(),
{ok, Chunk} = machi_proxy_flu1_client:read_chunk( {ok, [{_, Offset, Chunk, _}]} =
machi_proxy_flu1_client:read_chunk(
SrcP, EpochID, File, Offset, Size, SrcP, EpochID, File, Offset, Size,
?SHORT_TIMEOUT), ?SHORT_TIMEOUT),
_T2 = os:timestamp(), _T2 = os:timestamp(),

View file

@ -529,9 +529,12 @@ do_read_chunk2(File, Offset, Size, Depth, STime, TO,
ConsistencyMode = P#projection_v1.mode, ConsistencyMode = P#projection_v1.mode,
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
File, Offset, Size, ?TIMEOUT) of File, Offset, Size, ?TIMEOUT) of
{ok, Chunk} when byte_size(Chunk) == Size -> {ok, [{_, _, Chunk, _Csum}] = Chunks} when byte_size(Chunk) == Size ->
{reply, {ok, Chunk}, S}; {reply, {ok, Chunks}, S};
{ok, Chunks} when is_list(Chunks) ->
{reply, {ok, Chunks}, S};
{ok, BadChunk} -> {ok, BadChunk} ->
%% TODO cleaner handling of bad chunks
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size,
got, byte_size(BadChunk)}); got, byte_size(BadChunk)});
{error, bad_arg} = BadArg -> {error, bad_arg} = BadArg ->
@ -596,7 +599,7 @@ read_repair2(cp_mode=ConsistencyMode,
Tail = lists:last(readonly_flus(P)), Tail = lists:last(readonly_flus(P)),
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
File, Offset, Size, ?TIMEOUT) of File, Offset, Size, ?TIMEOUT) of
{ok, Chunk} when byte_size(Chunk) == Size -> {ok, [{_, Offset, Chunk, _}]} when byte_size(Chunk) == Size ->
ToRepair = mutation_flus(P) -- [Tail], ToRepair = mutation_flus(P) -- [Tail],
read_repair3(ToRepair, ReturnMode, Chunk, [Tail], File, Offset, read_repair3(ToRepair, ReturnMode, Chunk, [Tail], File, Offset,
Size, Depth, STime, S); Size, Depth, STime, S);
@ -620,10 +623,23 @@ read_repair2(ap_mode=ConsistencyMode,
#state{proj=P}=S) -> #state{proj=P}=S) ->
Eligible = mutation_flus(P), Eligible = mutation_flus(P),
case try_to_find_chunk(Eligible, File, Offset, Size, S) of case try_to_find_chunk(Eligible, File, Offset, Size, S) of
{ok, Chunk, GotItFrom} when byte_size(Chunk) == Size -> {ok, [{File0,Offset0,Chunk0,Csum}], GotItFrom} when byte_size(Chunk0) == Size ->
ToRepair = mutation_flus(P) -- [GotItFrom], ToRepair = mutation_flus(P) -- [GotItFrom],
read_repair3(ToRepair, ReturnMode, Chunk, [GotItFrom], File, %% TODO: stop matching single-size list
Offset, Size, Depth, STime, S); %% {RepairedChunks, S2} =
%% lists:foldl(fun({_, Offset0, Chunk0, Csum}, {Chunks0, S0}) ->
%% Size0 = byte_size(Chunk0),
%% {reply, {ok, Chunk1}, S1} =
%% read_repair3(ToRepair, ReturnMode, Chunk0, [GotItFrom], File,
%% Offset0, Size0, Depth, STime, S0),
%% {[{File, Offset0, Chunk1, Csum}|Chunks0], S1}
%% end,
%% {[], S}, Chunks),
%% {reply, {ok, RepairedChunks}, S2};
{reply, {ok, RepairedChunk}, S2}
= read_repair3(ToRepair, ReturnMode, Chunk0, [GotItFrom], File0,
Offset0, Size, Depth, STime, S),
{reply, {ok, [{File0, Offset0, RepairedChunk, Csum}]}, S2};
{ok, BadChunk, _GotItFrom} -> {ok, BadChunk, _GotItFrom} ->
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, exit({todo, bad_chunk_size, ?MODULE, ?LINE, File,
Offset, Size, got, byte_size(BadChunk)}); Offset, Size, got, byte_size(BadChunk)});
@ -856,14 +872,14 @@ try_to_find_chunk(Eligible, File, Offset, Size,
Proxy = orddict:fetch(FLU, PD), Proxy = orddict:fetch(FLU, PD),
case ?FLU_PC:read_chunk(Proxy, EpochID, case ?FLU_PC:read_chunk(Proxy, EpochID,
File, Offset, Size) of File, Offset, Size) of
{ok, Chunk} when byte_size(Chunk) == Size -> {ok, [{_, Offset, Chunk, _}] = Chunks} when byte_size(Chunk) == Size ->
{FLU, {ok, Chunk}}; {FLU, {ok, Chunks}};
Else -> Else ->
{FLU, Else} {FLU, Else}
end end
end, end,
Rs = run_middleworker_job(Work, Eligible, Timeout), Rs = run_middleworker_job(Work, Eligible, Timeout),
case [X || {_, {ok, B}}=X <- Rs, is_binary(B)] of case [X || {_, {ok, [{_,_,B,_}]}}=X <- Rs, is_binary(B)] of
[{FoundFLU, {ok, Chunk}}|_] -> [{FoundFLU, {ok, Chunk}}|_] ->
{ok, Chunk, FoundFLU}; {ok, Chunk, FoundFLU};
[] -> [] ->

View file

@ -132,7 +132,9 @@ sync(_Pid, Type) ->
% @doc Read file at offset for length % @doc Read file at offset for length
-spec read(Pid :: pid(), -spec read(Pid :: pid(),
Offset :: non_neg_integer(), Offset :: non_neg_integer(),
Length :: non_neg_integer()) -> {ok, Data :: binary(), Checksum :: binary()} | Length :: non_neg_integer()) ->
{ok, [{Filename::string(), Offset :: non_neg_integer(),
Data :: binary(), Checksum :: binary()}]} |
{error, Reason :: term()}. {error, Reason :: term()}.
read(Pid, Offset, Length) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0 read(Pid, Offset, Length) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
andalso is_integer(Length) andalso Length > 0 -> andalso is_integer(Length) andalso Length > 0 ->
@ -279,9 +281,14 @@ handle_call({read, Offset, Length}, _From,
undefined undefined
end, end,
{Resp, NewErr} = case handle_read(FH, F, Checksum, Offset, Length, U) of {Resp, NewErr} =
{ok, Bytes, Csum} -> case handle_read(FH, F, Checksum, Offset, Length, U) of
{{ok, Bytes, Csum}, Err}; {ok, Chunks} ->
%% Kludge to wrap read result in tuples, to support fragmented read
%% XXX FIXME
%% For now we are omiting the checksum data because it blows up
%% protobufs.
{{ok, Chunks}, Err};
eof -> eof ->
{{error, not_written}, Err + 1}; {{error, not_written}, Err + 1};
Error -> Error ->
@ -552,11 +559,11 @@ do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
[Bad, Ck]), [Bad, Ck]),
{error, bad_checksum}; {error, bad_checksum};
TaggedCsum -> TaggedCsum ->
{ok, Bytes, TaggedCsum}; {ok, [{Filename, Offset, Bytes, TaggedCsum}]};
OtherCsum when Tag =:= ?CSUM_TAG_NONE ->
%% XXX FIXME: Should we return something other than %% XXX FIXME: Should we return something other than
%% {ok, ....} in this case? %% {ok, ....} in this case?
OtherCsum when Tag =:= ?CSUM_TAG_NONE -> {ok, [{Filename, Offset, Bytes, OtherCsum}]}
{ok, Bytes, OtherCsum}
end; end;
{ok, Partial} -> {ok, Partial} ->
lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p", lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p",
@ -608,7 +615,7 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data, U) ->
lager:warning("This should never happen: got eof while reading at offset ~p in file ~p that's supposedly written", lager:warning("This should never happen: got eof while reading at offset ~p in file ~p that's supposedly written",
[Offset, Filename]), [Offset, Filename]),
{error, server_insanity}; {error, server_insanity};
{ok, _, _} -> {ok, _} ->
{ok, U}; {ok, U};
_ -> _ ->
{error, written} {error, written}

View file

@ -534,10 +534,7 @@ do_server_read_chunk(File, Offset, Size, _Opts, #state{flu_name=FluName})->
ok -> ok ->
{ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}), {ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}),
case machi_file_proxy:read(Pid, Offset, Size) of case machi_file_proxy:read(Pid, Offset, Size) of
%% XXX FIXME {ok, Chunks} -> {ok, Chunks};
%% For now we are omiting the checksum data because it blows up
%% protobufs.
{ok, Data, _Csum} -> {ok, Data};
Other -> Other Other -> Other
end; end;
_ -> _ ->

View file

@ -260,10 +260,11 @@ do_send_sync2({write_chunk, File, Offset, Chunk, CSum},
try try
ReqID = <<Index:64/big, Count:64/big>>, ReqID = <<Index:64/big, Count:64/big>>,
CSumT = convert_csum_req(CSum, Chunk), CSumT = convert_csum_req(CSum, Chunk),
Req = #mpb_writechunkreq{file=File, Req = #mpb_writechunkreq{chunk=
#mpb_chunk{chunk=Chunk,
file_name=File,
offset=Offset, offset=Offset,
chunk=Chunk, csum=CSumT}},
csum=CSumT},
R1a = #mpb_request{req_id=ReqID, do_not_alter=1, R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
write_chunk=Req}, write_chunk=Req},
Bin1a = machi_pb:encode_mpb_request(R1a), Bin1a = machi_pb:encode_mpb_request(R1a),
@ -285,9 +286,9 @@ do_send_sync2({read_chunk, File, Offset, Size},
#state{sock=Sock, sock_id=Index, count=Count}=S) -> #state{sock=Sock, sock_id=Index, count=Count}=S) ->
try try
ReqID = <<Index:64/big, Count:64/big>>, ReqID = <<Index:64/big, Count:64/big>>,
Req = #mpb_readchunkreq{file=File, Req = #mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
offset=Offset, offset=Offset,
size=Size}, chunk_size=Size}},
R1a = #mpb_request{req_id=ReqID, do_not_alter=1, R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
read_chunk=Req}, read_chunk=Req},
Bin1a = machi_pb:encode_mpb_request(R1a), Bin1a = machi_pb:encode_mpb_request(R1a),
@ -309,9 +310,9 @@ do_send_sync2({trim_chunk, File, Offset, Size},
#state{sock=Sock, sock_id=Index, count=Count}=S) -> #state{sock=Sock, sock_id=Index, count=Count}=S) ->
try try
ReqID = <<Index:64/big, Count:64/big>>, ReqID = <<Index:64/big, Count:64/big>>,
Req = #mpb_trimchunkreq{file=File, Req = #mpb_trimchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
offset=Offset, offset=Offset,
size=Size}, chunk_size=Size}},
R1a = #mpb_request{req_id=ReqID, do_not_alter=1, R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
trim_chunk=Req}, trim_chunk=Req},
Bin1a = machi_pb:encode_mpb_request(R1a), Bin1a = machi_pb:encode_mpb_request(R1a),
@ -415,8 +416,16 @@ convert_write_chunk_resp(#mpb_writechunkresp{status='OK'}) ->
convert_write_chunk_resp(#mpb_writechunkresp{status=Status}) -> convert_write_chunk_resp(#mpb_writechunkresp{status=Status}) ->
convert_general_status_code(Status). convert_general_status_code(Status).
convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunk=Chunk}) -> convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunks=PB_Chunks}) ->
{ok, Chunk}; Chunks = lists:map(fun(#mpb_chunk{offset=Offset,
file_name=File,
chunk=Chunk,
csum=#mpb_chunkcsum{type=T, csum=Ck}}) ->
%% TODO: cleanup export
Csum = <<(machi_pb_translate:conv_to_csum_tag(T)):8, Ck/binary>>,
{File, Offset, Chunk, Csum}
end, PB_Chunks),
{ok, Chunks};
convert_read_chunk_resp(#mpb_readchunkresp{status=Status}) -> convert_read_chunk_resp(#mpb_readchunkresp{status=Status}) ->
convert_general_status_code(Status). convert_general_status_code(Status).

View file

@ -15,8 +15,8 @@
%% KIND, either express or implied. See the License for the %% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations %% specific language governing permissions and limitations
%% under the License. %% under the License.
%%
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%%
-module(machi_pb_translate). -module(machi_pb_translate).
@ -37,6 +37,9 @@
to_pb_response/3 to_pb_response/3
]). ]).
%% TODO: fixme cleanup
-export([conv_to_csum_tag/1]).
from_pb_request(#mpb_ll_request{ from_pb_request(#mpb_ll_request{
req_id=ReqID, req_id=ReqID,
echo=#mpb_echoreq{message=Msg}}) -> echo=#mpb_echoreq{message=Msg}}) ->
@ -62,10 +65,10 @@ from_pb_request(#mpb_ll_request{
req_id=ReqID, req_id=ReqID,
write_chunk=#mpb_ll_writechunkreq{ write_chunk=#mpb_ll_writechunkreq{
epoch_id=PB_EpochID, epoch_id=PB_EpochID,
file=File, chunk=#mpb_chunk{file_name=File,
offset=Offset, offset=Offset,
chunk=Chunk, chunk=Chunk,
csum=#mpb_chunkcsum{type=CSum_type, csum=CSum}}}) -> csum=#mpb_chunkcsum{type=CSum_type, csum=CSum}}}}) ->
EpochID = conv_to_epoch_id(PB_EpochID), EpochID = conv_to_epoch_id(PB_EpochID),
CSum_tag = conv_to_csum_tag(CSum_type), CSum_tag = conv_to_csum_tag(CSum_type),
{ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, CSum}}; {ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, CSum}};
@ -73,14 +76,15 @@ from_pb_request(#mpb_ll_request{
req_id=ReqID, req_id=ReqID,
read_chunk=#mpb_ll_readchunkreq{ read_chunk=#mpb_ll_readchunkreq{
epoch_id=PB_EpochID, epoch_id=PB_EpochID,
file=File, chunk_pos=ChunkPos,
offset=Offset,
size=Size,
flag_no_checksum=PB_GetNoChecksum, flag_no_checksum=PB_GetNoChecksum,
flag_no_chunk=PB_GetNoChunk}}) -> flag_no_chunk=PB_GetNoChunk}}) ->
EpochID = conv_to_epoch_id(PB_EpochID), EpochID = conv_to_epoch_id(PB_EpochID),
Opts = [{no_checksum, conv_to_boolean(PB_GetNoChecksum)}, Opts = [{no_checksum, conv_to_boolean(PB_GetNoChecksum)},
{no_chunk, conv_to_boolean(PB_GetNoChunk)}], {no_chunk, conv_to_boolean(PB_GetNoChunk)}],
#mpb_chunkpos{file_name=File,
offset=Offset,
chunk_size=Size} = ChunkPos,
{ReqID, {low_read_chunk, EpochID, File, Offset, Size, Opts}}; {ReqID, {low_read_chunk, EpochID, File, Offset, Size, Opts}};
from_pb_request(#mpb_ll_request{ from_pb_request(#mpb_ll_request{
req_id=ReqID, req_id=ReqID,
@ -163,23 +167,23 @@ from_pb_request(#mpb_request{req_id=ReqID,
ChunkExtra}}; ChunkExtra}};
from_pb_request(#mpb_request{req_id=ReqID, from_pb_request(#mpb_request{req_id=ReqID,
write_chunk=IR=#mpb_writechunkreq{}}) -> write_chunk=IR=#mpb_writechunkreq{}}) ->
#mpb_writechunkreq{file=File, #mpb_writechunkreq{chunk=#mpb_chunk{file_name=File,
offset=Offset, offset=Offset,
chunk=Chunk, chunk=Chunk,
csum=CSum} = IR, csum=CSum}} = IR,
TaggedCSum = make_tagged_csum(CSum, Chunk), TaggedCSum = make_tagged_csum(CSum, Chunk),
{ReqID, {high_write_chunk, File, Offset, Chunk, TaggedCSum}}; {ReqID, {high_write_chunk, File, Offset, Chunk, TaggedCSum}};
from_pb_request(#mpb_request{req_id=ReqID, from_pb_request(#mpb_request{req_id=ReqID,
read_chunk=IR=#mpb_readchunkreq{}}) -> read_chunk=IR=#mpb_readchunkreq{}}) ->
#mpb_readchunkreq{file=File, #mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
offset=Offset, offset=Offset,
size=Size} = IR, chunk_size=Size}} = IR,
{ReqID, {high_read_chunk, File, Offset, Size}}; {ReqID, {high_read_chunk, File, Offset, Size}};
from_pb_request(#mpb_request{req_id=ReqID, from_pb_request(#mpb_request{req_id=ReqID,
trim_chunk=IR=#mpb_trimchunkreq{}}) -> trim_chunk=IR=#mpb_trimchunkreq{}}) ->
#mpb_trimchunkreq{file=File, #mpb_trimchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
offset=Offset, offset=Offset,
size=Size} = IR, chunk_size=Size}} = IR,
{ReqID, {high_trim_chunk, File, Offset, Size}}; {ReqID, {high_trim_chunk, File, Offset, Size}};
from_pb_request(#mpb_request{req_id=ReqID, from_pb_request(#mpb_request{req_id=ReqID,
checksum_list=IR=#mpb_checksumlistreq{}}) -> checksum_list=IR=#mpb_checksumlistreq{}}) ->
@ -229,10 +233,17 @@ from_pb_response(#mpb_ll_response{
from_pb_response(#mpb_ll_response{ from_pb_response(#mpb_ll_response{
req_id=ReqID, req_id=ReqID,
read_chunk=#mpb_ll_readchunkresp{status=Status, read_chunk=#mpb_ll_readchunkresp{status=Status,
chunk=Chunk}}) -> chunks=PB_Chunks}}) ->
case Status of case Status of
'OK' -> 'OK' ->
{ReqID, {ok, Chunk}}; Chunks = lists:map(fun(#mpb_chunk{file_name=File,
offset=Offset,
chunk=Bytes,
csum=#mpb_chunkcsum{type=T,csum=Ck}}) ->
Csum = <<(conv_to_csum_tag(T)):8, Ck/binary>>,
{File, Offset, Bytes, Csum}
end, PB_Chunks),
{ReqID, {ok, Chunks}};
_ -> _ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)} {ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end; end;
@ -367,10 +378,10 @@ to_pb_request(ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, C
#mpb_ll_request{req_id=ReqID, do_not_alter=2, #mpb_ll_request{req_id=ReqID, do_not_alter=2,
write_chunk=#mpb_ll_writechunkreq{ write_chunk=#mpb_ll_writechunkreq{
epoch_id=PB_EpochID, epoch_id=PB_EpochID,
file=File, chunk=#mpb_chunk{file_name=File,
offset=Offset, offset=Offset,
chunk=Chunk, chunk=Chunk,
csum=PB_CSum}}; csum=PB_CSum}}};
to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, _Opts}) -> to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, _Opts}) ->
%% TODO: stop ignoring Opts ^_^ %% TODO: stop ignoring Opts ^_^
PB_EpochID = conv_from_epoch_id(EpochID), PB_EpochID = conv_from_epoch_id(EpochID),
@ -378,9 +389,10 @@ to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, _Opts}) ->
req_id=ReqID, do_not_alter=2, req_id=ReqID, do_not_alter=2,
read_chunk=#mpb_ll_readchunkreq{ read_chunk=#mpb_ll_readchunkreq{
epoch_id=PB_EpochID, epoch_id=PB_EpochID,
file=File, chunk_pos=#mpb_chunkpos{
file_name=File,
offset=Offset, offset=Offset,
size=Size}}; chunk_size=Size}}};
to_pb_request(ReqID, {low_checksum_list, EpochID, File}) -> to_pb_request(ReqID, {low_checksum_list, EpochID, File}) ->
PB_EpochID = conv_from_epoch_id(EpochID), PB_EpochID = conv_from_epoch_id(EpochID),
#mpb_ll_request{req_id=ReqID, do_not_alter=2, #mpb_ll_request{req_id=ReqID, do_not_alter=2,
@ -466,12 +478,18 @@ to_pb_response(ReqID, {low_write_chunk, _EID, _Fl, _Off, _Ch, _CST, _CS},Resp)->
write_chunk=#mpb_ll_writechunkresp{status=Status}}; write_chunk=#mpb_ll_writechunkresp{status=Status}};
to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)-> to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)->
case Resp of case Resp of
{ok, Chunk} -> {ok, Chunks} ->
CSum = undefined, % TODO not implemented PB_Chunks = lists:map(fun({File, Offset, Bytes, Csum}) ->
{Tag, Ck} = machi_util:unmake_tagged_csum(Csum),
#mpb_chunk{file_name=File,
offset=Offset,
chunk=Bytes,
csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag),
csum=Ck}}
end, Chunks),
#mpb_ll_response{req_id=ReqID, #mpb_ll_response{req_id=ReqID,
read_chunk=#mpb_ll_readchunkresp{status='OK', read_chunk=#mpb_ll_readchunkresp{status='OK',
chunk=Chunk, chunks=PB_Chunks}};
csum=CSum}};
{error, _}=Error -> {error, _}=Error ->
Status = conv_from_status(Error), Status = conv_from_status(Error),
#mpb_ll_response{req_id=ReqID, #mpb_ll_response{req_id=ReqID,
@ -638,10 +656,19 @@ to_pb_response(ReqID, {high_write_chunk, _File, _Offset, _Chunk, _TaggedCSum}, R
end; end;
to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) -> to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) ->
case Resp of case Resp of
{ok, Chunk} -> {ok, Chunks} ->
MpbChunks = lists:map(fun({File, Offset, Bytes, Csum}) ->
{Tag, Ck} = machi_util:unmake_tagged_csum(Csum),
io:format(user, "~p oom~n", [Csum]),
#mpb_chunk{
offset=Offset,
file_name=File,
chunk=Bytes,
csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag), csum=Ck}}
end, Chunks),
#mpb_response{req_id=ReqID, #mpb_response{req_id=ReqID,
read_chunk=#mpb_readchunkresp{status='OK', read_chunk=#mpb_readchunkresp{status='OK',
chunk=Chunk}}; chunks=MpbChunks}};
{error, _}=Error -> {error, _}=Error ->
Status = conv_from_status(Error), Status = conv_from_status(Error),
#mpb_response{req_id=ReqID, #mpb_response{req_id=ReqID,

View file

@ -113,11 +113,12 @@ smoke_test2() ->
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1}, Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1},
{error, bad_checksum} = {error, bad_checksum} =
machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs), machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs),
{ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1), {ok, [{_, Off1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
{ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0, {ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0,
private), private),
%% Verify that the client's CR wrote to all of them. %% Verify that the client's CR wrote to all of them.
[{ok, Chunk1} = machi_flu1_client:read_chunk( [{ok, [{_, Off1, Chunk1, _}]} =
machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID, File1, Off1, Size1) || Host, PortBase+X, EpochID, File1, Off1, Size1) ||
X <- [0,1,2] ], X <- [0,1,2] ],
@ -129,8 +130,8 @@ smoke_test2() ->
File1, FooOff1, Size1) || X <- [0,1,2] ], File1, FooOff1, Size1) || X <- [0,1,2] ],
ok = machi_flu1_client:write_chunk(Host, PortBase+0, EpochID, ok = machi_flu1_client:write_chunk(Host, PortBase+0, EpochID,
File1, FooOff1, Chunk1), File1, FooOff1, Chunk1),
{ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, FooOff1, Size1), {ok, [{_, FooOff1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, FooOff1, Size1),
[{X,{ok, Chunk1}} = {X,machi_flu1_client:read_chunk( [{X,{ok, [{_, FooOff1, Chunk1, _}]}} = {X,machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID, Host, PortBase+X, EpochID,
File1, FooOff1, Size1)} || X <- [0,1,2] ], File1, FooOff1, Size1)} || X <- [0,1,2] ],
@ -140,8 +141,8 @@ smoke_test2() ->
Size2 = size(Chunk2), Size2 = size(Chunk2),
ok = machi_flu1_client:write_chunk(Host, PortBase+1, EpochID, ok = machi_flu1_client:write_chunk(Host, PortBase+1, EpochID,
File1, FooOff2, Chunk2), File1, FooOff2, Chunk2),
{ok, Chunk2} = machi_cr_client:read_chunk(C1, File1, FooOff2, Size2), {ok, [{_, FooOff2, Chunk2, _}]} = machi_cr_client:read_chunk(C1, File1, FooOff2, Size2),
[{X,{ok, Chunk2}} = {X,machi_flu1_client:read_chunk( [{X,{ok, [{_, FooOff2, Chunk2, _}]}} = {X,machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID, Host, PortBase+X, EpochID,
File1, FooOff2, Size2)} || X <- [0,1,2] ], File1, FooOff2, Size2)} || X <- [0,1,2] ],
@ -165,7 +166,8 @@ smoke_test2() ->
{ok, {Off10,Size10,File10}} = {ok, {Off10,Size10,File10}} =
machi_cr_client:append_chunk_extra(C1, Prefix, Chunk10, machi_cr_client:append_chunk_extra(C1, Prefix, Chunk10,
Extra10 * Size10), Extra10 * Size10),
{ok, Chunk10} = machi_cr_client:read_chunk(C1, File10, Off10, Size10), {ok, [{_, Off10, Chunk10, _}]} =
machi_cr_client:read_chunk(C1, File10, Off10, Size10),
[begin [begin
Offx = Off10 + (Seq * Size10), Offx = Off10 + (Seq * Size10),
%% TODO: uncomment written/not_written enforcement is available. %% TODO: uncomment written/not_written enforcement is available.
@ -173,8 +175,8 @@ smoke_test2() ->
%% Offx, Size10), %% Offx, Size10),
{ok, {Offx,Size10,File10}} = {ok, {Offx,Size10,File10}} =
machi_cr_client:write_chunk(C1, File10, Offx, Chunk10), machi_cr_client:write_chunk(C1, File10, Offx, Chunk10),
{ok, Chunk10} = machi_cr_client:read_chunk(C1, File10, Offx, {ok, [{_, Offx, Chunk10, _}]} =
Size10) machi_cr_client:read_chunk(C1, File10, Offx, Size10)
end || Seq <- lists:seq(1, Extra10)], end || Seq <- lists:seq(1, Extra10)],
{ok, {Off11,Size11,File11}} = {ok, {Off11,Size11,File11}} =
machi_cr_client:append_chunk(C1, Prefix, Chunk10), machi_cr_client:append_chunk(C1, Prefix, Chunk10),
@ -196,7 +198,8 @@ witness_smoke_test2() ->
{error, {already_started, P1}} -> P1; {error, {already_started, P1}} -> P1;
Other -> error(Other) Other -> error(Other)
end, end,
error_logger:tty(false), %% TODO: I wonder why commenting this out makes this test pass
%% error_logger:tty(true),
try try
Prefix = <<"pre">>, Prefix = <<"pre">>,
Chunk1 = <<"yochunk">>, Chunk1 = <<"yochunk">>,
@ -215,7 +218,7 @@ witness_smoke_test2() ->
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1}, Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1},
{error, bad_checksum} = {error, bad_checksum} =
machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs), machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs),
{ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1), {ok, [{_, Off1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
%% Stop 'b' and let the chain reset. %% Stop 'b' and let the chain reset.
ok = machi_flu_psup:stop_flu_package(b), ok = machi_flu_psup:stop_flu_package(b),
@ -241,7 +244,8 @@ witness_smoke_test2() ->
end, end,
%% Chunk1 is still readable: not affected by wedged witness head. %% Chunk1 is still readable: not affected by wedged witness head.
{ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1), {ok, [{_, Off1, Chunk1, _}]} =
machi_cr_client:read_chunk(C1, File1, Off1, Size1),
%% But because the head is wedged, an append will fail. %% But because the head is wedged, an append will fail.
{error, partition} = {error, partition} =
machi_cr_client:append_chunk(C1, Prefix, Chunk1, 1*1000), machi_cr_client:append_chunk(C1, Prefix, Chunk1, 1*1000),

View file

@ -201,7 +201,12 @@ read_post(S, [_Pid, Off, L], Res) ->
read_next(S, _Res, _Args) -> S. read_next(S, _Res, _Args) -> S.
read(Pid, Offset, Length) -> read(Pid, Offset, Length) ->
machi_file_proxy:read(Pid, Offset, Length). case machi_file_proxy:read(Pid, Offset, Length) of
{ok, [{_, Offset, Data, Csum}]} ->
{ok, Data, Csum};
E ->
E
end.
%% write %% write

View file

@ -92,7 +92,7 @@ machi_file_proxy_test_() ->
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))), ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)), ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)), ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)),
?_assertMatch({ok, _, _}, machi_file_proxy:read(Pid, 1025, 1000)), ?_assertMatch({ok, [{_, _, _, _}]}, machi_file_proxy:read(Pid, 1025, 1000)),
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)), ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)),
?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))), ?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))),
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid)) ?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))

View file

@ -97,7 +97,7 @@ flu_smoke_test() ->
{ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort, {ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH, ?DUMMY_PV1_EPOCH,
Prefix, Chunk1), Prefix, Chunk1),
{ok, Chunk1} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, {ok, [{_, Off1, Chunk1, _}]} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
File1, Off1, Len1), File1, Off1, Len1),
{ok, KludgeBin} = ?FLU_C:checksum_list(Host, TcpPort, {ok, KludgeBin} = ?FLU_C:checksum_list(Host, TcpPort,
?DUMMY_PV1_EPOCH, File1), ?DUMMY_PV1_EPOCH, File1),
@ -151,7 +151,7 @@ flu_smoke_test() ->
File2, Off2, Chunk2), File2, Off2, Chunk2),
{error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, {error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
BadFile, Off2, Chunk2), BadFile, Off2, Chunk2),
{ok, Chunk2} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, {ok, [{_, Off2, Chunk2, _}]} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
File2, Off2, Len2), File2, Off2, Len2),
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort, {error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH, ?DUMMY_PV1_EPOCH,

View file

@ -80,7 +80,9 @@ smoke_test2() ->
{iolist_to_binary(Chunk2), File2, Off2, Size2}, {iolist_to_binary(Chunk2), File2, Off2, Size2},
{iolist_to_binary(Chunk3), File3, Off3, Size3}], {iolist_to_binary(Chunk3), File3, Off3, Size3}],
[begin [begin
{ok, Ch} = ?C:read_chunk(Clnt, Fl, Off, Sz) File = binary_to_list(Fl),
?assertMatch({ok, [{File, Off, Ch, _}]},
?C:read_chunk(Clnt, Fl, Off, Sz))
end || {Ch, Fl, Off, Sz} <- Reads], end || {Ch, Fl, Off, Sz} <- Reads],
{ok, KludgeBin} = ?C:checksum_list(Clnt, File1), {ok, KludgeBin} = ?C:checksum_list(Clnt, File1),

View file

@ -60,12 +60,14 @@ api_smoke_test() ->
{ok, {MyOff,MySize,MyFile}} = {ok, {MyOff,MySize,MyFile}} =
?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk, ?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk,
infinity), infinity),
{ok, MyChunk} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize), {ok, [{_, MyOff, MyChunk, _}]} =
?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize),
MyChunk2 = <<"my chunk data, yeah, again">>, MyChunk2 = <<"my chunk data, yeah, again">>,
{ok, {MyOff2,MySize2,MyFile2}} = {ok, {MyOff2,MySize2,MyFile2}} =
?MUT:append_chunk_extra(Prox1, FakeEpoch, Prefix, ?MUT:append_chunk_extra(Prox1, FakeEpoch, Prefix,
MyChunk2, 4242, infinity), MyChunk2, 4242, infinity),
{ok, MyChunk2} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2), {ok, [{_, MyOff2, MyChunk2, _}]} =
?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2),
MyChunk_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, MyChunk}, MyChunk_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, MyChunk},
{error, bad_checksum} = ?MUT:append_chunk(Prox1, FakeEpoch, {error, bad_checksum} = ?MUT:append_chunk(Prox1, FakeEpoch,
Prefix, MyChunk_badcs), Prefix, MyChunk_badcs),
@ -240,7 +242,7 @@ flu_restart_test() ->
(stop) -> ?MUT:append_chunk_extra(Prox1, FakeEpoch, (stop) -> ?MUT:append_chunk_extra(Prox1, FakeEpoch,
<<"prefix">>, Data, 42, infinity) <<"prefix">>, Data, 42, infinity)
end, end,
fun(run) -> {ok, Data} = fun(run) -> {ok, [{_, Off1, Data, _}]} =
?MUT:read_chunk(Prox1, FakeEpoch, ?MUT:read_chunk(Prox1, FakeEpoch,
File1, Off1, Size1), File1, Off1, Size1),
ok; ok;