WIP: write_chunk #1

This commit is contained in:
Scott Lystig Fritchie 2015-06-23 15:13:13 +09:00
parent cb06c53dc0
commit a8782eed5a
3 changed files with 59 additions and 21 deletions

View file

@ -74,7 +74,7 @@ message Mpb_ChunkCSum {
// epoch_id() type
message Mpb_EpochId {
required uint32 epoch_num = 1;
required Mpb_ChunkCSum epoch_csum = 2;
required bytes epoch_csum = 2;
}
//////////////////////////////////////////
@ -135,12 +135,11 @@ message Mpb_WriteChunkReq {
required string file = 1;
required uint64 offset = 2;
required bytes chunk = 3;
required Mpb_ChunkCSum csum = 4;
}
message Mpb_WriteChunkResp {
required Mpb_GeneralStatusCode status = 1;
// If OK, then chunk_pos is defined.
optional Mpb_ChunkPos chunk_pos = 2;
}
//////////////////////////////////////////

View file

@ -38,7 +38,8 @@
connected_p/1,
echo/2, echo/3,
auth/3, auth/4,
append_chunk/6, append_chunk/7
append_chunk/6, append_chunk/7,
write_chunk/5, write_chunk/6
]).
%% gen_server callbacks
@ -82,6 +83,12 @@ append_chunk(PidSpec, PlacementKey, Prefix, Chunk, CSum, ChunkExtra) ->
append_chunk(PidSpec, PlacementKey, Prefix, Chunk, CSum, ChunkExtra, Timeout) ->
send_sync(PidSpec, {append_chunk, PlacementKey, Prefix, Chunk, CSum, ChunkExtra}, Timeout).
write_chunk(PidSpec, File, Offset, Chunk, CSum) ->
write_chunk(PidSpec, File, Offset, Chunk, CSum, ?DEFAULT_TIMEOUT).
write_chunk(PidSpec, File, Offset, Chunk, CSum, Timeout) ->
send_sync(PidSpec, {write_chunk, File, Offset, Chunk, CSum}, Timeout).
send_sync(PidSpec, Cmd, Timeout) ->
gen_server:call(PidSpec, {send_sync, Cmd}, Timeout).
@ -195,14 +202,7 @@ do_send_sync({append_chunk, PlacementKey, Prefix, Chunk, CSum, ChunkExtra},
PK = if PlacementKey == <<>> -> undefined;
true -> PlacementKey
end,
CSumT = case CSum of
none ->
#mpb_chunkcsum{type='CSUM_TAG_NONE',
csum=undefined};
{client_sha, CSumBin} ->
#mpb_chunkcsum{type='CSUM_TAG_CLIENT_SHA',
csum=CSumBin}
end,
CSumT = convert_csum_req(CSum),
Req = #mpb_appendchunkreq{placement_key=PK,
prefix=Prefix,
chunk=Chunk,
@ -224,20 +224,59 @@ do_send_sync({append_chunk, PlacementKey, Prefix, Chunk, CSum, ChunkExtra},
catch X:Y ->
Res = {bummer, {X, Y, erlang:get_stacktrace()}},
{Res, S#state{count=Count+1}}
end;
do_send_sync({write_chunk, File, Offset, Chunk, CSum},
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
try
ReqID = <<Index:64/big, Count:64/big>>,
CSumT = convert_csum_req(CSum),
Req = #mpb_writechunkreq{file=File,
offset=Offset,
chunk=Chunk,
csum=CSumT},
R1a = #mpb_request{req_id=ReqID,
write_chunk=Req},
Bin1a = machi_pb:encode_mpb_request(R1a),
ok = gen_tcp:send(Sock, Bin1a),
{ok, Bin1B} = gen_tcp:recv(Sock, 0),
case (catch machi_pb:decode_mpb_response(Bin1B)) of
#mpb_response{req_id=ReqID, write_chunk=R} when R /= undefined ->
Result = convert_write_chunk_resp(R),
{Result, S#state{count=Count+1}};
#mpb_response{req_id=ReqID, generic=G} when G /= undefined ->
#mpb_errorresp{code=Code, msg=Msg, extra=Extra} = G,
{{error, {Code, Msg, Extra}}, S#state{count=Count+1}}
end
catch X:Y ->
Res = {bummer, {X, Y, erlang:get_stacktrace()}},
{Res, S#state{count=Count+1}}
end.
convert_csum_req(none) ->
#mpb_chunkcsum{type='CSUM_TAG_NONE',
csum=undefined};
convert_csum_req({client_sha, CSumBin}) ->
#mpb_chunkcsum{type='CSUM_TAG_CLIENT_SHA',
csum=CSumBin}.
convert_append_chunk_resp(#mpb_appendchunkresp{status='OK', chunk_pos=CP}) ->
#mpb_chunkpos{offset=Offset, chunk_size=Size, file_name=File} = CP,
{ok, {Offset, Size, File}};
convert_append_chunk_resp(#mpb_appendchunkresp{status='BAD_ARG'}) ->
convert_append_chunk_resp(#mpb_appendchunkresp{status=Status}) ->
convert_general_status_code(Status).
convert_general_status_code('BAD_ARG') ->
{error, bad_arg};
convert_append_chunk_resp(#mpb_appendchunkresp{status='WEDGED'}) ->
convert_general_status_code('WEDGED') ->
{error, wedged};
convert_append_chunk_resp(#mpb_appendchunkresp{status='BAD_CHECKSUM'}) ->
convert_general_status_code('BAD_CHECKSUM') ->
{error, bad_checksum};
convert_append_chunk_resp(#mpb_appendchunkresp{status='PARTITION'}) ->
convert_general_status_code('PARTITION') ->
{error, partition};
convert_append_chunk_resp(#mpb_appendchunkresp{status='BAD_JOSS'}) ->
convert_general_status_code('BAD_JOSS') ->
throw({error, bad_joss_taipan_fixme}).
convert_write_chunk_resp(#mpb_writechunkresp{status='OK'}) ->
ok;
convert_write_chunk_resp(#mpb_writechunkresp{status=Status}) ->
convert_general_status_code(Status).

View file

@ -69,9 +69,9 @@ smoke_test2() ->
CSum2 = {client_sha, machi_util:checksum_chunk(Chunk2)},
{ok, {Off2, Size2, File2}} =
?C:append_chunk(Clnt, PK, Prefix, Chunk2, CSum2, 1024),
%% Chunk3 = ["This is a ", <<"test,">>, 32, [["Hello, world!"]]],
%% {ok, {Off3, Size3, File3}} =
%% ?C:write_chunk(Clnt, File2, Off2+iolist_size(Chunk2), Chunk3),
Chunk3 = ["This is a ", <<"test,">>, 32, [["Hello, world!"]]],
ok = ?C:write_chunk(Clnt, File2, Off2+iolist_size(Chunk2),
Chunk3, none),
ok
after