From 6961930b0f1682e6f74e65178f955e69f9ef26b2 Mon Sep 17 00:00:00 2001 From: UENISHI Kota Date: Mon, 19 Oct 2015 09:40:05 +0900 Subject: [PATCH 1/2] Update eper --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 782fa10..5a48783 100644 --- a/rebar.config +++ b/rebar.config @@ -9,7 +9,7 @@ {protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", {tag, "0.8.1p4"}}}, {riak_dt, ".*", {git, "git://github.com/basho/riak_dt.git", {branch, "develop"}}}, {node_package, ".*", {git, "git://github.com/basho/node_package.git", {branch, "develop"}}}, - {eper, ".*", {git, "git://github.com/basho/eper.git", {tag, "0.78"}}} + {eper, ".*", {git, "git://github.com/basho/eper.git", {tag, "0.92-basho1"}}} ]}. {sub_dirs, ["rel", "apps/machi"]}. -- 2.45.2 From 3e975f53b8bfa9c63bb2387613c85f196e75c601 Mon Sep 17 00:00:00 2001 From: UENISHI Kota Date: Mon, 19 Oct 2015 12:09:39 +0900 Subject: [PATCH 2/2] Allow read_chunk() to return partial chunks This is simply a change of read_chunk() protocol, where a response of read_chunk() becomes list of written bytes along with checksum. All related code including repair is changed as such. This is to pass all tests and not actually supporting partial chunks. --- src/machi.proto | 45 ++++++------- src/machi_admin_util.erl | 11 +++- src/machi_chain_repair.erl | 7 +- src/machi_cr_client.erl | 34 +++++++--- src/machi_file_proxy.erl | 39 ++++++----- src/machi_flu1.erl | 5 +- src/machi_pb_high_client.erl | 33 ++++++---- src/machi_pb_translate.erl | 93 +++++++++++++++++---------- test/machi_cr_client_test.erl | 30 +++++---- test/machi_file_proxy_eqc.erl | 7 +- test/machi_file_proxy_test.erl | 2 +- test/machi_flu1_test.erl | 6 +- test/machi_pb_high_client_test.erl | 4 +- test/machi_proxy_flu1_client_test.erl | 8 ++- 14 files changed, 196 insertions(+), 128 deletions(-) diff --git a/src/machi.proto b/src/machi.proto index 338f40f..2cec895 100644 --- a/src/machi.proto +++ b/src/machi.proto @@ -86,6 +86,14 @@ message Mpb_ChunkCSum { optional bytes csum = 2; } +message Mpb_Chunk { + required uint64 offset = 1; + required string file_name = 2; + required bytes chunk = 3; + // TODO: must be required, in future? + optional Mpb_ChunkCSum csum = 4; +} + // epoch_id() type message Mpb_EpochID { required uint32 epoch_number = 1; @@ -177,10 +185,7 @@ message Mpb_AppendChunkResp { // High level API: write_chunk() request & response message Mpb_WriteChunkReq { - required string file = 1; - required uint64 offset = 2; - required bytes chunk = 3; - required Mpb_ChunkCSum csum = 4; + required Mpb_Chunk chunk = 1; } message Mpb_WriteChunkResp { @@ -190,33 +195,27 @@ message Mpb_WriteChunkResp { // High level API: read_chunk() request & response message Mpb_ReadChunkReq { - required string file = 1; - required uint64 offset = 2; - required uint32 size = 3; + required Mpb_ChunkPos chunk_pos = 1; // Use flag_no_checksum=non-zero to skip returning the chunk's checksum. // TODO: not implemented yet. - optional uint32 flag_no_checksum = 4 [default=0]; + optional uint32 flag_no_checksum = 2 [default=0]; // Use flag_no_chunk=non-zero to skip returning the chunk (which // only makes sense if flag_no_checksum is not set). // TODO: not implemented yet. - optional uint32 flag_no_chunk = 5 [default=0]; + optional uint32 flag_no_chunk = 3 [default=0]; } message Mpb_ReadChunkResp { required Mpb_GeneralStatusCode status = 1; - optional bytes chunk = 2; - optional Mpb_ChunkCSum csum = 3; + repeated Mpb_Chunk chunks = 2; } - // High level API: trim_chunk() request & response message Mpb_TrimChunkReq { - required string file = 1; - required uint64 offset = 2; - required uint32 size = 3; + required Mpb_ChunkPos chunk_pos = 1; } message Mpb_TrimChunkResp { @@ -390,10 +389,7 @@ message Mpb_LL_AppendChunkResp { message Mpb_LL_WriteChunkReq { required Mpb_EpochID epoch_id = 1; - required string file = 2; - required uint64 offset = 3; - required bytes chunk = 4; - required Mpb_ChunkCSum csum = 5; + required Mpb_Chunk chunk = 2; } message Mpb_LL_WriteChunkResp { @@ -404,24 +400,21 @@ message Mpb_LL_WriteChunkResp { message Mpb_LL_ReadChunkReq { required Mpb_EpochID epoch_id = 1; - required string file = 2; - required uint64 offset = 3; - required uint32 size = 4; + required Mpb_ChunkPos chunk_pos = 2; // Use flag_no_checksum=non-zero to skip returning the chunk's checksum. // TODO: not implemented yet. - optional uint32 flag_no_checksum = 5 [default=0]; + optional uint32 flag_no_checksum = 3 [default=0]; // Use flag_no_chunk=non-zero to skip returning the chunk (which // only makes sense if flag_checksum is not set). // TODO: not implemented yet. - optional uint32 flag_no_chunk = 6 [default=0]; + optional uint32 flag_no_chunk = 4 [default=0]; } message Mpb_LL_ReadChunkResp { required Mpb_GeneralStatusCode status = 1; - optional bytes chunk = 2; - optional Mpb_ChunkCSum csum = 3; + repeated Mpb_Chunk chunks = 2; } // Low level API: checksum_list() diff --git a/src/machi_admin_util.erl b/src/machi_admin_util.erl index 770a566..f40fa3e 100644 --- a/src/machi_admin_util.erl +++ b/src/machi_admin_util.erl @@ -73,8 +73,13 @@ verify_file_checksums_local2(Sock1, EpochID, Path0) -> {ok, FH} -> File = re:replace(Path, ".*/", "", [{return, binary}]), try - ReadChunk = fun(_File, Offset, Size) -> - file:pread(FH, Offset, Size) + ReadChunk = fun(F, Offset, Size) -> + case file:pread(FH, Offset, Size) of + {ok, Bin} -> + {ok, [{F, Offset, Bin, undefined}]}; + Err -> + Err + end end, verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) after @@ -112,7 +117,7 @@ verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) -> verify_chunk_checksum(File, ReadChunk) -> fun({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) -> case ReadChunk(File, Offset, Size) of - {ok, Chunk} -> + {ok, [{_, Offset, Chunk, _}]} -> CSum2 = machi_util:checksum_chunk(Chunk), if CSum == CSum2 -> Acc; diff --git a/src/machi_chain_repair.erl b/src/machi_chain_repair.erl index 055f07d..946c4c7 100644 --- a/src/machi_chain_repair.erl +++ b/src/machi_chain_repair.erl @@ -323,9 +323,10 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) -> _ -> ok end, _T1 = os:timestamp(), - {ok, Chunk} = machi_proxy_flu1_client:read_chunk( - SrcP, EpochID, File, Offset, Size, - ?SHORT_TIMEOUT), + {ok, [{_, Offset, Chunk, _}]} = + machi_proxy_flu1_client:read_chunk( + SrcP, EpochID, File, Offset, Size, + ?SHORT_TIMEOUT), _T2 = os:timestamp(), <<_Tag:1/binary, CSum/binary>> = TaggedCSum, case machi_util:checksum_chunk(Chunk) of diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index 4c93deb..91f3a23 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -529,9 +529,12 @@ do_read_chunk2(File, Offset, Size, Depth, STime, TO, ConsistencyMode = P#projection_v1.mode, case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, File, Offset, Size, ?TIMEOUT) of - {ok, Chunk} when byte_size(Chunk) == Size -> - {reply, {ok, Chunk}, S}; + {ok, [{_, _, Chunk, _Csum}] = Chunks} when byte_size(Chunk) == Size -> + {reply, {ok, Chunks}, S}; + {ok, Chunks} when is_list(Chunks) -> + {reply, {ok, Chunks}, S}; {ok, BadChunk} -> + %% TODO cleaner handling of bad chunks exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, got, byte_size(BadChunk)}); {error, bad_arg} = BadArg -> @@ -596,7 +599,7 @@ read_repair2(cp_mode=ConsistencyMode, Tail = lists:last(readonly_flus(P)), case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, File, Offset, Size, ?TIMEOUT) of - {ok, Chunk} when byte_size(Chunk) == Size -> + {ok, [{_, Offset, Chunk, _}]} when byte_size(Chunk) == Size -> ToRepair = mutation_flus(P) -- [Tail], read_repair3(ToRepair, ReturnMode, Chunk, [Tail], File, Offset, Size, Depth, STime, S); @@ -620,10 +623,23 @@ read_repair2(ap_mode=ConsistencyMode, #state{proj=P}=S) -> Eligible = mutation_flus(P), case try_to_find_chunk(Eligible, File, Offset, Size, S) of - {ok, Chunk, GotItFrom} when byte_size(Chunk) == Size -> + {ok, [{File0,Offset0,Chunk0,Csum}], GotItFrom} when byte_size(Chunk0) == Size -> ToRepair = mutation_flus(P) -- [GotItFrom], - read_repair3(ToRepair, ReturnMode, Chunk, [GotItFrom], File, - Offset, Size, Depth, STime, S); + %% TODO: stop matching single-size list + %% {RepairedChunks, S2} = + %% lists:foldl(fun({_, Offset0, Chunk0, Csum}, {Chunks0, S0}) -> + %% Size0 = byte_size(Chunk0), + %% {reply, {ok, Chunk1}, S1} = + %% read_repair3(ToRepair, ReturnMode, Chunk0, [GotItFrom], File, + %% Offset0, Size0, Depth, STime, S0), + %% {[{File, Offset0, Chunk1, Csum}|Chunks0], S1} + %% end, + %% {[], S}, Chunks), + %% {reply, {ok, RepairedChunks}, S2}; + {reply, {ok, RepairedChunk}, S2} + = read_repair3(ToRepair, ReturnMode, Chunk0, [GotItFrom], File0, + Offset0, Size, Depth, STime, S), + {reply, {ok, [{File0, Offset0, RepairedChunk, Csum}]}, S2}; {ok, BadChunk, _GotItFrom} -> exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, got, byte_size(BadChunk)}); @@ -856,14 +872,14 @@ try_to_find_chunk(Eligible, File, Offset, Size, Proxy = orddict:fetch(FLU, PD), case ?FLU_PC:read_chunk(Proxy, EpochID, File, Offset, Size) of - {ok, Chunk} when byte_size(Chunk) == Size -> - {FLU, {ok, Chunk}}; + {ok, [{_, Offset, Chunk, _}] = Chunks} when byte_size(Chunk) == Size -> + {FLU, {ok, Chunks}}; Else -> {FLU, Else} end end, Rs = run_middleworker_job(Work, Eligible, Timeout), - case [X || {_, {ok, B}}=X <- Rs, is_binary(B)] of + case [X || {_, {ok, [{_,_,B,_}]}}=X <- Rs, is_binary(B)] of [{FoundFLU, {ok, Chunk}}|_] -> {ok, Chunk, FoundFLU}; [] -> diff --git a/src/machi_file_proxy.erl b/src/machi_file_proxy.erl index 8a850de..11fdf2a 100644 --- a/src/machi_file_proxy.erl +++ b/src/machi_file_proxy.erl @@ -132,8 +132,10 @@ sync(_Pid, Type) -> % @doc Read file at offset for length -spec read(Pid :: pid(), Offset :: non_neg_integer(), - Length :: non_neg_integer()) -> {ok, Data :: binary(), Checksum :: binary()} | - {error, Reason :: term()}. + Length :: non_neg_integer()) -> + {ok, [{Filename::string(), Offset :: non_neg_integer(), + Data :: binary(), Checksum :: binary()}]} | + {error, Reason :: term()}. read(Pid, Offset, Length) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0 andalso is_integer(Length) andalso Length > 0 -> gen_server:call(Pid, {read, Offset, Length}, ?TIMEOUT); @@ -279,14 +281,19 @@ handle_call({read, Offset, Length}, _From, undefined end, - {Resp, NewErr} = case handle_read(FH, F, Checksum, Offset, Length, U) of - {ok, Bytes, Csum} -> - {{ok, Bytes, Csum}, Err}; - eof -> - {{error, not_written}, Err + 1}; - Error -> - {Error, Err + 1} - end, + {Resp, NewErr} = + case handle_read(FH, F, Checksum, Offset, Length, U) of + {ok, Chunks} -> + %% Kludge to wrap read result in tuples, to support fragmented read + %% XXX FIXME + %% For now we are omiting the checksum data because it blows up + %% protobufs. + {{ok, Chunks}, Err}; + eof -> + {{error, not_written}, Err + 1}; + Error -> + {Error, Err + 1} + end, {reply, Resp, State#state{reads = {T+1, NewErr}}}; %%% WRITES @@ -542,7 +549,7 @@ handle_read(FHd, Filename, TaggedCsum, Offset, Size, U) -> do_read(FHd, Filename, TaggedCsum, Offset, Size) -> case file:pread(FHd, Offset, Size) of - eof -> + eof -> eof; {ok, Bytes} when byte_size(Bytes) == Size -> {Tag, Ck} = machi_util:unmake_tagged_csum(TaggedCsum), @@ -552,11 +559,11 @@ do_read(FHd, Filename, TaggedCsum, Offset, Size) -> [Bad, Ck]), {error, bad_checksum}; TaggedCsum -> - {ok, Bytes, TaggedCsum}; - %% XXX FIXME: Should we return something other than - %% {ok, ....} in this case? + {ok, [{Filename, Offset, Bytes, TaggedCsum}]}; OtherCsum when Tag =:= ?CSUM_TAG_NONE -> - {ok, Bytes, OtherCsum} + %% XXX FIXME: Should we return something other than + %% {ok, ....} in this case? + {ok, [{Filename, Offset, Bytes, OtherCsum}]} end; {ok, Partial} -> lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p", @@ -608,7 +615,7 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data, U) -> lager:warning("This should never happen: got eof while reading at offset ~p in file ~p that's supposedly written", [Offset, Filename]), {error, server_insanity}; - {ok, _, _} -> + {ok, _} -> {ok, U}; _ -> {error, written} diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 24a19a0..c8c144e 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -534,10 +534,7 @@ do_server_read_chunk(File, Offset, Size, _Opts, #state{flu_name=FluName})-> ok -> {ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}), case machi_file_proxy:read(Pid, Offset, Size) of - %% XXX FIXME - %% For now we are omiting the checksum data because it blows up - %% protobufs. - {ok, Data, _Csum} -> {ok, Data}; + {ok, Chunks} -> {ok, Chunks}; Other -> Other end; _ -> diff --git a/src/machi_pb_high_client.erl b/src/machi_pb_high_client.erl index 505f6b1..1d54d07 100644 --- a/src/machi_pb_high_client.erl +++ b/src/machi_pb_high_client.erl @@ -260,10 +260,11 @@ do_send_sync2({write_chunk, File, Offset, Chunk, CSum}, try ReqID = <>, CSumT = convert_csum_req(CSum, Chunk), - Req = #mpb_writechunkreq{file=File, - offset=Offset, - chunk=Chunk, - csum=CSumT}, + Req = #mpb_writechunkreq{chunk= + #mpb_chunk{chunk=Chunk, + file_name=File, + offset=Offset, + csum=CSumT}}, R1a = #mpb_request{req_id=ReqID, do_not_alter=1, write_chunk=Req}, Bin1a = machi_pb:encode_mpb_request(R1a), @@ -285,9 +286,9 @@ do_send_sync2({read_chunk, File, Offset, Size}, #state{sock=Sock, sock_id=Index, count=Count}=S) -> try ReqID = <>, - Req = #mpb_readchunkreq{file=File, - offset=Offset, - size=Size}, + Req = #mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File, + offset=Offset, + chunk_size=Size}}, R1a = #mpb_request{req_id=ReqID, do_not_alter=1, read_chunk=Req}, Bin1a = machi_pb:encode_mpb_request(R1a), @@ -309,9 +310,9 @@ do_send_sync2({trim_chunk, File, Offset, Size}, #state{sock=Sock, sock_id=Index, count=Count}=S) -> try ReqID = <>, - Req = #mpb_trimchunkreq{file=File, - offset=Offset, - size=Size}, + Req = #mpb_trimchunkreq{chunk_pos=#mpb_chunkpos{file_name=File, + offset=Offset, + chunk_size=Size}}, R1a = #mpb_request{req_id=ReqID, do_not_alter=1, trim_chunk=Req}, Bin1a = machi_pb:encode_mpb_request(R1a), @@ -415,8 +416,16 @@ convert_write_chunk_resp(#mpb_writechunkresp{status='OK'}) -> convert_write_chunk_resp(#mpb_writechunkresp{status=Status}) -> convert_general_status_code(Status). -convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunk=Chunk}) -> - {ok, Chunk}; +convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunks=PB_Chunks}) -> + Chunks = lists:map(fun(#mpb_chunk{offset=Offset, + file_name=File, + chunk=Chunk, + csum=#mpb_chunkcsum{type=T, csum=Ck}}) -> + %% TODO: cleanup export + Csum = <<(machi_pb_translate:conv_to_csum_tag(T)):8, Ck/binary>>, + {File, Offset, Chunk, Csum} + end, PB_Chunks), + {ok, Chunks}; convert_read_chunk_resp(#mpb_readchunkresp{status=Status}) -> convert_general_status_code(Status). diff --git a/src/machi_pb_translate.erl b/src/machi_pb_translate.erl index bbed40b..0c5392e 100644 --- a/src/machi_pb_translate.erl +++ b/src/machi_pb_translate.erl @@ -15,8 +15,8 @@ %% KIND, either express or implied. See the License for the %% specific language governing permissions and limitations %% under the License. -%% %% ------------------------------------------------------------------- +%% -module(machi_pb_translate). @@ -37,6 +37,9 @@ to_pb_response/3 ]). +%% TODO: fixme cleanup +-export([conv_to_csum_tag/1]). + from_pb_request(#mpb_ll_request{ req_id=ReqID, echo=#mpb_echoreq{message=Msg}}) -> @@ -62,10 +65,10 @@ from_pb_request(#mpb_ll_request{ req_id=ReqID, write_chunk=#mpb_ll_writechunkreq{ epoch_id=PB_EpochID, - file=File, - offset=Offset, - chunk=Chunk, - csum=#mpb_chunkcsum{type=CSum_type, csum=CSum}}}) -> + chunk=#mpb_chunk{file_name=File, + offset=Offset, + chunk=Chunk, + csum=#mpb_chunkcsum{type=CSum_type, csum=CSum}}}}) -> EpochID = conv_to_epoch_id(PB_EpochID), CSum_tag = conv_to_csum_tag(CSum_type), {ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, CSum}}; @@ -73,14 +76,15 @@ from_pb_request(#mpb_ll_request{ req_id=ReqID, read_chunk=#mpb_ll_readchunkreq{ epoch_id=PB_EpochID, - file=File, - offset=Offset, - size=Size, + chunk_pos=ChunkPos, flag_no_checksum=PB_GetNoChecksum, flag_no_chunk=PB_GetNoChunk}}) -> EpochID = conv_to_epoch_id(PB_EpochID), Opts = [{no_checksum, conv_to_boolean(PB_GetNoChecksum)}, {no_chunk, conv_to_boolean(PB_GetNoChunk)}], + #mpb_chunkpos{file_name=File, + offset=Offset, + chunk_size=Size} = ChunkPos, {ReqID, {low_read_chunk, EpochID, File, Offset, Size, Opts}}; from_pb_request(#mpb_ll_request{ req_id=ReqID, @@ -163,23 +167,23 @@ from_pb_request(#mpb_request{req_id=ReqID, ChunkExtra}}; from_pb_request(#mpb_request{req_id=ReqID, write_chunk=IR=#mpb_writechunkreq{}}) -> - #mpb_writechunkreq{file=File, - offset=Offset, - chunk=Chunk, - csum=CSum} = IR, + #mpb_writechunkreq{chunk=#mpb_chunk{file_name=File, + offset=Offset, + chunk=Chunk, + csum=CSum}} = IR, TaggedCSum = make_tagged_csum(CSum, Chunk), {ReqID, {high_write_chunk, File, Offset, Chunk, TaggedCSum}}; from_pb_request(#mpb_request{req_id=ReqID, read_chunk=IR=#mpb_readchunkreq{}}) -> - #mpb_readchunkreq{file=File, - offset=Offset, - size=Size} = IR, + #mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File, + offset=Offset, + chunk_size=Size}} = IR, {ReqID, {high_read_chunk, File, Offset, Size}}; from_pb_request(#mpb_request{req_id=ReqID, trim_chunk=IR=#mpb_trimchunkreq{}}) -> - #mpb_trimchunkreq{file=File, - offset=Offset, - size=Size} = IR, + #mpb_trimchunkreq{chunk_pos=#mpb_chunkpos{file_name=File, + offset=Offset, + chunk_size=Size}} = IR, {ReqID, {high_trim_chunk, File, Offset, Size}}; from_pb_request(#mpb_request{req_id=ReqID, checksum_list=IR=#mpb_checksumlistreq{}}) -> @@ -229,10 +233,17 @@ from_pb_response(#mpb_ll_response{ from_pb_response(#mpb_ll_response{ req_id=ReqID, read_chunk=#mpb_ll_readchunkresp{status=Status, - chunk=Chunk}}) -> + chunks=PB_Chunks}}) -> case Status of 'OK' -> - {ReqID, {ok, Chunk}}; + Chunks = lists:map(fun(#mpb_chunk{file_name=File, + offset=Offset, + chunk=Bytes, + csum=#mpb_chunkcsum{type=T,csum=Ck}}) -> + Csum = <<(conv_to_csum_tag(T)):8, Ck/binary>>, + {File, Offset, Bytes, Csum} + end, PB_Chunks), + {ReqID, {ok, Chunks}}; _ -> {ReqID, machi_pb_high_client:convert_general_status_code(Status)} end; @@ -367,10 +378,10 @@ to_pb_request(ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, C #mpb_ll_request{req_id=ReqID, do_not_alter=2, write_chunk=#mpb_ll_writechunkreq{ epoch_id=PB_EpochID, - file=File, - offset=Offset, - chunk=Chunk, - csum=PB_CSum}}; + chunk=#mpb_chunk{file_name=File, + offset=Offset, + chunk=Chunk, + csum=PB_CSum}}}; to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, _Opts}) -> %% TODO: stop ignoring Opts ^_^ PB_EpochID = conv_from_epoch_id(EpochID), @@ -378,9 +389,10 @@ to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, _Opts}) -> req_id=ReqID, do_not_alter=2, read_chunk=#mpb_ll_readchunkreq{ epoch_id=PB_EpochID, - file=File, - offset=Offset, - size=Size}}; + chunk_pos=#mpb_chunkpos{ + file_name=File, + offset=Offset, + chunk_size=Size}}}; to_pb_request(ReqID, {low_checksum_list, EpochID, File}) -> PB_EpochID = conv_from_epoch_id(EpochID), #mpb_ll_request{req_id=ReqID, do_not_alter=2, @@ -466,12 +478,18 @@ to_pb_response(ReqID, {low_write_chunk, _EID, _Fl, _Off, _Ch, _CST, _CS},Resp)-> write_chunk=#mpb_ll_writechunkresp{status=Status}}; to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)-> case Resp of - {ok, Chunk} -> - CSum = undefined, % TODO not implemented + {ok, Chunks} -> + PB_Chunks = lists:map(fun({File, Offset, Bytes, Csum}) -> + {Tag, Ck} = machi_util:unmake_tagged_csum(Csum), + #mpb_chunk{file_name=File, + offset=Offset, + chunk=Bytes, + csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag), + csum=Ck}} + end, Chunks), #mpb_ll_response{req_id=ReqID, read_chunk=#mpb_ll_readchunkresp{status='OK', - chunk=Chunk, - csum=CSum}}; + chunks=PB_Chunks}}; {error, _}=Error -> Status = conv_from_status(Error), #mpb_ll_response{req_id=ReqID, @@ -638,10 +656,19 @@ to_pb_response(ReqID, {high_write_chunk, _File, _Offset, _Chunk, _TaggedCSum}, R end; to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) -> case Resp of - {ok, Chunk} -> + {ok, Chunks} -> + MpbChunks = lists:map(fun({File, Offset, Bytes, Csum}) -> + {Tag, Ck} = machi_util:unmake_tagged_csum(Csum), + io:format(user, "~p oom~n", [Csum]), + #mpb_chunk{ + offset=Offset, + file_name=File, + chunk=Bytes, + csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag), csum=Ck}} + end, Chunks), #mpb_response{req_id=ReqID, read_chunk=#mpb_readchunkresp{status='OK', - chunk=Chunk}}; + chunks=MpbChunks}}; {error, _}=Error -> Status = conv_from_status(Error), #mpb_response{req_id=ReqID, diff --git a/test/machi_cr_client_test.erl b/test/machi_cr_client_test.erl index d92dabf..41f1bda 100644 --- a/test/machi_cr_client_test.erl +++ b/test/machi_cr_client_test.erl @@ -113,12 +113,13 @@ smoke_test2() -> Chunk1_badcs = {<>, Chunk1}, {error, bad_checksum} = machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs), - {ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1), + {ok, [{_, Off1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, Off1, Size1), {ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0, private), %% Verify that the client's CR wrote to all of them. - [{ok, Chunk1} = machi_flu1_client:read_chunk( - Host, PortBase+X, EpochID, File1, Off1, Size1) || + [{ok, [{_, Off1, Chunk1, _}]} = + machi_flu1_client:read_chunk( + Host, PortBase+X, EpochID, File1, Off1, Size1) || X <- [0,1,2] ], %% Test read repair: Manually write to head, then verify that @@ -129,8 +130,8 @@ smoke_test2() -> File1, FooOff1, Size1) || X <- [0,1,2] ], ok = machi_flu1_client:write_chunk(Host, PortBase+0, EpochID, File1, FooOff1, Chunk1), - {ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, FooOff1, Size1), - [{X,{ok, Chunk1}} = {X,machi_flu1_client:read_chunk( + {ok, [{_, FooOff1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, FooOff1, Size1), + [{X,{ok, [{_, FooOff1, Chunk1, _}]}} = {X,machi_flu1_client:read_chunk( Host, PortBase+X, EpochID, File1, FooOff1, Size1)} || X <- [0,1,2] ], @@ -140,8 +141,8 @@ smoke_test2() -> Size2 = size(Chunk2), ok = machi_flu1_client:write_chunk(Host, PortBase+1, EpochID, File1, FooOff2, Chunk2), - {ok, Chunk2} = machi_cr_client:read_chunk(C1, File1, FooOff2, Size2), - [{X,{ok, Chunk2}} = {X,machi_flu1_client:read_chunk( + {ok, [{_, FooOff2, Chunk2, _}]} = machi_cr_client:read_chunk(C1, File1, FooOff2, Size2), + [{X,{ok, [{_, FooOff2, Chunk2, _}]}} = {X,machi_flu1_client:read_chunk( Host, PortBase+X, EpochID, File1, FooOff2, Size2)} || X <- [0,1,2] ], @@ -165,7 +166,8 @@ smoke_test2() -> {ok, {Off10,Size10,File10}} = machi_cr_client:append_chunk_extra(C1, Prefix, Chunk10, Extra10 * Size10), - {ok, Chunk10} = machi_cr_client:read_chunk(C1, File10, Off10, Size10), + {ok, [{_, Off10, Chunk10, _}]} = + machi_cr_client:read_chunk(C1, File10, Off10, Size10), [begin Offx = Off10 + (Seq * Size10), %% TODO: uncomment written/not_written enforcement is available. @@ -173,8 +175,8 @@ smoke_test2() -> %% Offx, Size10), {ok, {Offx,Size10,File10}} = machi_cr_client:write_chunk(C1, File10, Offx, Chunk10), - {ok, Chunk10} = machi_cr_client:read_chunk(C1, File10, Offx, - Size10) + {ok, [{_, Offx, Chunk10, _}]} = + machi_cr_client:read_chunk(C1, File10, Offx, Size10) end || Seq <- lists:seq(1, Extra10)], {ok, {Off11,Size11,File11}} = machi_cr_client:append_chunk(C1, Prefix, Chunk10), @@ -196,7 +198,8 @@ witness_smoke_test2() -> {error, {already_started, P1}} -> P1; Other -> error(Other) end, - error_logger:tty(false), + %% TODO: I wonder why commenting this out makes this test pass + %% error_logger:tty(true), try Prefix = <<"pre">>, Chunk1 = <<"yochunk">>, @@ -215,7 +218,7 @@ witness_smoke_test2() -> Chunk1_badcs = {<>, Chunk1}, {error, bad_checksum} = machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs), - {ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1), + {ok, [{_, Off1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, Off1, Size1), %% Stop 'b' and let the chain reset. ok = machi_flu_psup:stop_flu_package(b), @@ -241,7 +244,8 @@ witness_smoke_test2() -> end, %% Chunk1 is still readable: not affected by wedged witness head. - {ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1), + {ok, [{_, Off1, Chunk1, _}]} = + machi_cr_client:read_chunk(C1, File1, Off1, Size1), %% But because the head is wedged, an append will fail. {error, partition} = machi_cr_client:append_chunk(C1, Prefix, Chunk1, 1*1000), diff --git a/test/machi_file_proxy_eqc.erl b/test/machi_file_proxy_eqc.erl index 39b0852..725f4ac 100644 --- a/test/machi_file_proxy_eqc.erl +++ b/test/machi_file_proxy_eqc.erl @@ -201,7 +201,12 @@ read_post(S, [_Pid, Off, L], Res) -> read_next(S, _Res, _Args) -> S. read(Pid, Offset, Length) -> - machi_file_proxy:read(Pid, Offset, Length). + case machi_file_proxy:read(Pid, Offset, Length) of + {ok, [{_, Offset, Data, Csum}]} -> + {ok, Data, Csum}; + E -> + E + end. %% write diff --git a/test/machi_file_proxy_test.erl b/test/machi_file_proxy_test.erl index bcb0085..ea6f784 100644 --- a/test/machi_file_proxy_test.erl +++ b/test/machi_file_proxy_test.erl @@ -92,7 +92,7 @@ machi_file_proxy_test_() -> ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))), ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)), ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)), - ?_assertMatch({ok, _, _}, machi_file_proxy:read(Pid, 1025, 1000)), + ?_assertMatch({ok, [{_, _, _, _}]}, machi_file_proxy:read(Pid, 1025, 1000)), ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)), ?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))), ?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid)) diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index 084086a..ac3ba33 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -97,8 +97,8 @@ flu_smoke_test() -> {ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, Prefix, Chunk1), - {ok, Chunk1} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, - File1, Off1, Len1), + {ok, [{_, Off1, Chunk1, _}]} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, + File1, Off1, Len1), {ok, KludgeBin} = ?FLU_C:checksum_list(Host, TcpPort, ?DUMMY_PV1_EPOCH, File1), true = is_binary(KludgeBin), @@ -151,7 +151,7 @@ flu_smoke_test() -> File2, Off2, Chunk2), {error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, BadFile, Off2, Chunk2), - {ok, Chunk2} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, + {ok, [{_, Off2, Chunk2, _}]} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, File2, Off2, Len2), {error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, diff --git a/test/machi_pb_high_client_test.erl b/test/machi_pb_high_client_test.erl index 36387b5..1f5a1bf 100644 --- a/test/machi_pb_high_client_test.erl +++ b/test/machi_pb_high_client_test.erl @@ -80,7 +80,9 @@ smoke_test2() -> {iolist_to_binary(Chunk2), File2, Off2, Size2}, {iolist_to_binary(Chunk3), File3, Off3, Size3}], [begin - {ok, Ch} = ?C:read_chunk(Clnt, Fl, Off, Sz) + File = binary_to_list(Fl), + ?assertMatch({ok, [{File, Off, Ch, _}]}, + ?C:read_chunk(Clnt, Fl, Off, Sz)) end || {Ch, Fl, Off, Sz} <- Reads], {ok, KludgeBin} = ?C:checksum_list(Clnt, File1), diff --git a/test/machi_proxy_flu1_client_test.erl b/test/machi_proxy_flu1_client_test.erl index d843879..324d3b4 100644 --- a/test/machi_proxy_flu1_client_test.erl +++ b/test/machi_proxy_flu1_client_test.erl @@ -60,12 +60,14 @@ api_smoke_test() -> {ok, {MyOff,MySize,MyFile}} = ?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk, infinity), - {ok, MyChunk} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize), + {ok, [{_, MyOff, MyChunk, _}]} = + ?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize), MyChunk2 = <<"my chunk data, yeah, again">>, {ok, {MyOff2,MySize2,MyFile2}} = ?MUT:append_chunk_extra(Prox1, FakeEpoch, Prefix, MyChunk2, 4242, infinity), - {ok, MyChunk2} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2), + {ok, [{_, MyOff2, MyChunk2, _}]} = + ?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2), MyChunk_badcs = {<>, MyChunk}, {error, bad_checksum} = ?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk_badcs), @@ -240,7 +242,7 @@ flu_restart_test() -> (stop) -> ?MUT:append_chunk_extra(Prox1, FakeEpoch, <<"prefix">>, Data, 42, infinity) end, - fun(run) -> {ok, Data} = + fun(run) -> {ok, [{_, Off1, Data, _}]} = ?MUT:read_chunk(Prox1, FakeEpoch, File1, Off1, Size1), ok; -- 2.45.2