diff --git a/src/machi.proto b/src/machi.proto index d4c3b82..59c9242 100644 --- a/src/machi.proto +++ b/src/machi.proto @@ -74,7 +74,7 @@ message Mpb_ChunkCSum { // epoch_id() type message Mpb_EpochId { required uint32 epoch_num = 1; - required Mpb_ChunkCSum epoch_csum = 2; + required bytes epoch_csum = 2; } ////////////////////////////////////////// @@ -135,12 +135,11 @@ message Mpb_WriteChunkReq { required string file = 1; required uint64 offset = 2; required bytes chunk = 3; + required Mpb_ChunkCSum csum = 4; } message Mpb_WriteChunkResp { required Mpb_GeneralStatusCode status = 1; - // If OK, then chunk_pos is defined. - optional Mpb_ChunkPos chunk_pos = 2; } ////////////////////////////////////////// diff --git a/src/machi_pb_high_client.erl b/src/machi_pb_high_client.erl index 6529bca..b076959 100644 --- a/src/machi_pb_high_client.erl +++ b/src/machi_pb_high_client.erl @@ -38,7 +38,8 @@ connected_p/1, echo/2, echo/3, auth/3, auth/4, - append_chunk/6, append_chunk/7 + append_chunk/6, append_chunk/7, + write_chunk/5, write_chunk/6 ]). %% gen_server callbacks @@ -82,6 +83,12 @@ append_chunk(PidSpec, PlacementKey, Prefix, Chunk, CSum, ChunkExtra) -> append_chunk(PidSpec, PlacementKey, Prefix, Chunk, CSum, ChunkExtra, Timeout) -> send_sync(PidSpec, {append_chunk, PlacementKey, Prefix, Chunk, CSum, ChunkExtra}, Timeout). +write_chunk(PidSpec, File, Offset, Chunk, CSum) -> + write_chunk(PidSpec, File, Offset, Chunk, CSum, ?DEFAULT_TIMEOUT). + +write_chunk(PidSpec, File, Offset, Chunk, CSum, Timeout) -> + send_sync(PidSpec, {write_chunk, File, Offset, Chunk, CSum}, Timeout). + send_sync(PidSpec, Cmd, Timeout) -> gen_server:call(PidSpec, {send_sync, Cmd}, Timeout). @@ -195,14 +202,7 @@ do_send_sync({append_chunk, PlacementKey, Prefix, Chunk, CSum, ChunkExtra}, PK = if PlacementKey == <<>> -> undefined; true -> PlacementKey end, - CSumT = case CSum of - none -> - #mpb_chunkcsum{type='CSUM_TAG_NONE', - csum=undefined}; - {client_sha, CSumBin} -> - #mpb_chunkcsum{type='CSUM_TAG_CLIENT_SHA', - csum=CSumBin} - end, + CSumT = convert_csum_req(CSum), Req = #mpb_appendchunkreq{placement_key=PK, prefix=Prefix, chunk=Chunk, @@ -224,20 +224,59 @@ do_send_sync({append_chunk, PlacementKey, Prefix, Chunk, CSum, ChunkExtra}, catch X:Y -> Res = {bummer, {X, Y, erlang:get_stacktrace()}}, {Res, S#state{count=Count+1}} + end; +do_send_sync({write_chunk, File, Offset, Chunk, CSum}, + #state{sock=Sock, sock_id=Index, count=Count}=S) -> + try + ReqID = <>, + CSumT = convert_csum_req(CSum), + Req = #mpb_writechunkreq{file=File, + offset=Offset, + chunk=Chunk, + csum=CSumT}, + R1a = #mpb_request{req_id=ReqID, + write_chunk=Req}, + Bin1a = machi_pb:encode_mpb_request(R1a), + ok = gen_tcp:send(Sock, Bin1a), + {ok, Bin1B} = gen_tcp:recv(Sock, 0), + case (catch machi_pb:decode_mpb_response(Bin1B)) of + #mpb_response{req_id=ReqID, write_chunk=R} when R /= undefined -> + Result = convert_write_chunk_resp(R), + {Result, S#state{count=Count+1}}; + #mpb_response{req_id=ReqID, generic=G} when G /= undefined -> + #mpb_errorresp{code=Code, msg=Msg, extra=Extra} = G, + {{error, {Code, Msg, Extra}}, S#state{count=Count+1}} + end + catch X:Y -> + Res = {bummer, {X, Y, erlang:get_stacktrace()}}, + {Res, S#state{count=Count+1}} end. +convert_csum_req(none) -> + #mpb_chunkcsum{type='CSUM_TAG_NONE', + csum=undefined}; +convert_csum_req({client_sha, CSumBin}) -> + #mpb_chunkcsum{type='CSUM_TAG_CLIENT_SHA', + csum=CSumBin}. + convert_append_chunk_resp(#mpb_appendchunkresp{status='OK', chunk_pos=CP}) -> #mpb_chunkpos{offset=Offset, chunk_size=Size, file_name=File} = CP, {ok, {Offset, Size, File}}; -convert_append_chunk_resp(#mpb_appendchunkresp{status='BAD_ARG'}) -> +convert_append_chunk_resp(#mpb_appendchunkresp{status=Status}) -> + convert_general_status_code(Status). + +convert_general_status_code('BAD_ARG') -> {error, bad_arg}; -convert_append_chunk_resp(#mpb_appendchunkresp{status='WEDGED'}) -> +convert_general_status_code('WEDGED') -> {error, wedged}; -convert_append_chunk_resp(#mpb_appendchunkresp{status='BAD_CHECKSUM'}) -> +convert_general_status_code('BAD_CHECKSUM') -> {error, bad_checksum}; -convert_append_chunk_resp(#mpb_appendchunkresp{status='PARTITION'}) -> +convert_general_status_code('PARTITION') -> {error, partition}; -convert_append_chunk_resp(#mpb_appendchunkresp{status='BAD_JOSS'}) -> +convert_general_status_code('BAD_JOSS') -> throw({error, bad_joss_taipan_fixme}). - +convert_write_chunk_resp(#mpb_writechunkresp{status='OK'}) -> + ok; +convert_write_chunk_resp(#mpb_writechunkresp{status=Status}) -> + convert_general_status_code(Status). diff --git a/test/machi_pb_high_client_test.erl b/test/machi_pb_high_client_test.erl index 73b35a5..1a3c0ed 100644 --- a/test/machi_pb_high_client_test.erl +++ b/test/machi_pb_high_client_test.erl @@ -69,9 +69,9 @@ smoke_test2() -> CSum2 = {client_sha, machi_util:checksum_chunk(Chunk2)}, {ok, {Off2, Size2, File2}} = ?C:append_chunk(Clnt, PK, Prefix, Chunk2, CSum2, 1024), - %% Chunk3 = ["This is a ", <<"test,">>, 32, [["Hello, world!"]]], - %% {ok, {Off3, Size3, File3}} = - %% ?C:write_chunk(Clnt, File2, Off2+iolist_size(Chunk2), Chunk3), + Chunk3 = ["This is a ", <<"test,">>, 32, [["Hello, world!"]]], + ok = ?C:write_chunk(Clnt, File2, Off2+iolist_size(Chunk2), + Chunk3, none), ok after