WIP: read_chunk #1

This commit is contained in:
Scott Lystig Fritchie 2015-06-23 15:34:48 +09:00
parent a8782eed5a
commit 44c22bf752
3 changed files with 67 additions and 3 deletions

View file

@ -142,6 +142,25 @@ message Mpb_WriteChunkResp {
required Mpb_GeneralStatusCode status = 1;
}
// read_chunk() request & response
message Mpb_ReadChunkReq {
required string file = 1;
required uint64 offset = 2;
required uint32 size = 3;
// Use flag_checksum=non-zero to request the chunk's checksum also
optional uint32 flag_checksum = 4 [default=0];
// Use flag_no_chunk=non-zero to skip returning the chunk (which
// only makes sense if flag_checksum is set).
optional uint32 flag_no_chunk = 5 [default=0];
}
message Mpb_ReadChunkResp {
required Mpb_GeneralStatusCode status = 1;
optional bytes chunk = 2;
optional Mpb_ChunkCSum csum = 3;
}
//////////////////////////////////////////
//
// request & response wrapper
@ -163,6 +182,7 @@ message Mpb_Request {
optional Mpb_AuthReq auth = 11;
optional Mpb_AppendChunkReq append_chunk = 12;
optional Mpb_WriteChunkReq write_chunk = 13;
optional Mpb_ReadChunkReq read_chunk = 14;
}
message Mpb_Response {
@ -183,4 +203,5 @@ message Mpb_Response {
optional Mpb_AuthResp auth = 11;
optional Mpb_AppendChunkResp append_chunk = 12;
optional Mpb_WriteChunkResp write_chunk = 13;
optional Mpb_ReadChunkResp read_chunk = 14;
}

View file

@ -39,7 +39,8 @@
echo/2, echo/3,
auth/3, auth/4,
append_chunk/6, append_chunk/7,
write_chunk/5, write_chunk/6
write_chunk/5, write_chunk/6,
read_chunk/4, read_chunk/5
]).
%% gen_server callbacks
@ -89,6 +90,12 @@ write_chunk(PidSpec, File, Offset, Chunk, CSum) ->
write_chunk(PidSpec, File, Offset, Chunk, CSum, Timeout) ->
send_sync(PidSpec, {write_chunk, File, Offset, Chunk, CSum}, Timeout).
read_chunk(PidSpec, File, Offset, Size) ->
read_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT).
read_chunk(PidSpec, File, Offset, Size, Timeout) ->
send_sync(PidSpec, {read_chunk, File, Offset, Size}, Timeout).
send_sync(PidSpec, Cmd, Timeout) ->
gen_server:call(PidSpec, {send_sync, Cmd}, Timeout).
@ -250,6 +257,30 @@ do_send_sync({write_chunk, File, Offset, Chunk, CSum},
catch X:Y ->
Res = {bummer, {X, Y, erlang:get_stacktrace()}},
{Res, S#state{count=Count+1}}
end;
do_send_sync({read_chunk, File, Offset, Size},
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
try
ReqID = <<Index:64/big, Count:64/big>>,
Req = #mpb_readchunkreq{file=File,
offset=Offset,
size=Size},
R1a = #mpb_request{req_id=ReqID,
read_chunk=Req},
Bin1a = machi_pb:encode_mpb_request(R1a),
ok = gen_tcp:send(Sock, Bin1a),
{ok, Bin1B} = gen_tcp:recv(Sock, 0),
case (catch machi_pb:decode_mpb_response(Bin1B)) of
#mpb_response{req_id=ReqID, read_chunk=R} when R /= undefined ->
Result = convert_read_chunk_resp(R),
{Result, S#state{count=Count+1}};
#mpb_response{req_id=ReqID, generic=G} when G /= undefined ->
#mpb_errorresp{code=Code, msg=Msg, extra=Extra} = G,
{{error, {Code, Msg, Extra}}, S#state{count=Count+1}}
end
catch X:Y ->
Res = {bummer, {X, Y, erlang:get_stacktrace()}},
{Res, S#state{count=Count+1}}
end.
convert_csum_req(none) ->
@ -280,3 +311,8 @@ convert_write_chunk_resp(#mpb_writechunkresp{status='OK'}) ->
ok;
convert_write_chunk_resp(#mpb_writechunkresp{status=Status}) ->
convert_general_status_code(Status).
convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunk=Chunk}) ->
{ok, Chunk};
convert_read_chunk_resp(#mpb_readchunkresp{status=Status}) ->
convert_general_status_code(Status).

View file

@ -70,8 +70,15 @@ smoke_test2() ->
{ok, {Off2, Size2, File2}} =
?C:append_chunk(Clnt, PK, Prefix, Chunk2, CSum2, 1024),
Chunk3 = ["This is a ", <<"test,">>, 32, [["Hello, world!"]]],
ok = ?C:write_chunk(Clnt, File2, Off2+iolist_size(Chunk2),
Chunk3, none),
Off3 = Off2 + iolist_size(Chunk2),
Size3 = iolist_size(Chunk3),
ok = ?C:write_chunk(Clnt, File2, Off3, Chunk3, none),
Reads = [{Chunk1, File1, Off1, Size1},
{Chunk2, File2, Off2, Size2},
{Chunk3, File2, Off3, Size3}],
[{ok, Ch} = ?C:read_chunk(Clnt, Fl, Off, Sz) ||
{Ch, Fl, Off, Sz} <- Reads],
ok
after