Add surface of trim chunks to scrub #8

Merged
kuenishi merged 3 commits from ku/trim-high-proto into master 2015-10-14 03:45:07 +00:00
6 changed files with 108 additions and 4 deletions

View file

@ -130,6 +130,7 @@ message Mpb_ErrorResp {
// append_chunk() : Mpb_AppendChunkReq and Mpb_AppendChunkResp // append_chunk() : Mpb_AppendChunkReq and Mpb_AppendChunkResp
// write_chunk() : Mpb_WriteChunkReq and Mpb_WriteChunkResp // write_chunk() : Mpb_WriteChunkReq and Mpb_WriteChunkResp
// read_chunk() : Mpb_ReadChunkReq and Mpb_ReadChunkResp // read_chunk() : Mpb_ReadChunkReq and Mpb_ReadChunkResp
// trim_chunk() : Mpb_TrimChunkReq and Mpb_TrimChunkResp
// checksum_list() : Mpb_ChecksumListReq and Mpb_ChecksumListResp // checksum_list() : Mpb_ChecksumListReq and Mpb_ChecksumListResp
// list_files() : Mpb_ListFilesReq and Mpb_ListFilesResp // list_files() : Mpb_ListFilesReq and Mpb_ListFilesResp
// //
@ -209,6 +210,19 @@ message Mpb_ReadChunkResp {
optional Mpb_ChunkCSum csum = 3; optional Mpb_ChunkCSum csum = 3;
} }
// High level API: trim_chunk() request & response
message Mpb_TrimChunkReq {
required string file = 1;
required uint64 offset = 2;
required uint32 size = 3;
}
message Mpb_TrimChunkResp {
required Mpb_GeneralStatusCode status = 1;
}
// High level API: checksum_list() request & response // High level API: checksum_list() request & response
message Mpb_ChecksumListReq { message Mpb_ChecksumListReq {
@ -258,8 +272,9 @@ message Mpb_Request {
optional Mpb_AppendChunkReq append_chunk = 112; optional Mpb_AppendChunkReq append_chunk = 112;
optional Mpb_WriteChunkReq write_chunk = 113; optional Mpb_WriteChunkReq write_chunk = 113;
optional Mpb_ReadChunkReq read_chunk = 114; optional Mpb_ReadChunkReq read_chunk = 114;
optional Mpb_ChecksumListReq checksum_list = 115; optional Mpb_TrimChunkReq trim_chunk = 115;
optional Mpb_ListFilesReq list_files = 116; optional Mpb_ChecksumListReq checksum_list = 116;
optional Mpb_ListFilesReq list_files = 117;
} }
message Mpb_Response { message Mpb_Response {
@ -281,8 +296,9 @@ message Mpb_Response {
optional Mpb_AppendChunkResp append_chunk = 12; optional Mpb_AppendChunkResp append_chunk = 12;
optional Mpb_WriteChunkResp write_chunk = 13; optional Mpb_WriteChunkResp write_chunk = 13;
optional Mpb_ReadChunkResp read_chunk = 14; optional Mpb_ReadChunkResp read_chunk = 14;
optional Mpb_ChecksumListResp checksum_list = 15; optional Mpb_TrimChunkResp trim_chunk = 15;
optional Mpb_ListFilesResp list_files = 16; optional Mpb_ChecksumListResp checksum_list = 16;
optional Mpb_ListFilesResp list_files = 17;
} }
////////////////////////////////////////// //////////////////////////////////////////

View file

@ -122,6 +122,7 @@
append_chunk_extra/4, append_chunk_extra/5, append_chunk_extra/4, append_chunk_extra/5,
write_chunk/4, write_chunk/5, write_chunk/4, write_chunk/5,
read_chunk/4, read_chunk/5, read_chunk/4, read_chunk/5,
trim_chunk/4, trim_chunk/5,
checksum_list/2, checksum_list/3, checksum_list/2, checksum_list/3,
list_files/1, list_files/2, list_files/1, list_files/2,
@ -209,6 +210,18 @@ read_chunk(PidSpec, File, Offset, Size, Timeout0) ->
gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size, TO}}, gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size, TO}},
Timeout). Timeout).
%% @doc Trim a chunk of data of size `Size' from `File' at `Offset'.
trim_chunk(PidSpec, File, Offset, Size) ->
trim_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT).
%% @doc Trim a chunk of data of size `Size' from `File' at `Offset'.
trim_chunk(PidSpec, File, Offset, Size, Timeout0) ->
{TO, Timeout} = timeout(Timeout0),
gen_server:call(PidSpec, {req, {trim_chunk, File, Offset, Size, TO}},
Timeout).
%% @doc Fetch the list of chunk checksums for `File'. %% @doc Fetch the list of chunk checksums for `File'.
checksum_list(PidSpec, File) -> checksum_list(PidSpec, File) ->
@ -276,6 +289,8 @@ handle_call2({write_chunk, File, Offset, Chunk, TO}, _From, S) ->
do_write_head(File, Offset, Chunk, 0, os:timestamp(), TO, S); do_write_head(File, Offset, Chunk, 0, os:timestamp(), TO, S);
handle_call2({read_chunk, File, Offset, Size, TO}, _From, S) -> handle_call2({read_chunk, File, Offset, Size, TO}, _From, S) ->
do_read_chunk(File, Offset, Size, 0, os:timestamp(), TO, S); do_read_chunk(File, Offset, Size, 0, os:timestamp(), TO, S);
handle_call2({trim_chunk, File, Offset, Size, TO}, _From, S) ->
do_trim_chunk(File, Offset, Size, 0, os:timestamp(), TO, S);
handle_call2({checksum_list, File, TO}, _From, S) -> handle_call2({checksum_list, File, TO}, _From, S) ->
do_checksum_list(File, 0, os:timestamp(), TO, S); do_checksum_list(File, 0, os:timestamp(), TO, S);
handle_call2({list_files, TO}, _From, S) -> handle_call2({list_files, TO}, _From, S) ->
@ -534,6 +549,10 @@ do_read_chunk2(File, Offset, Size, Depth, STime, TO,
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size})
end. end.
do_trim_chunk(_File, _Offset, _Size, _Depth, _STime, _TO, S) ->
%% This is just a stub to reach CR client from high level client
{reply, {error, bad_joss}, S}.
%% Read repair: depends on the consistency mode that we're in: %% Read repair: depends on the consistency mode that we're in:
%% %%
%% CP mode: If the head is written, then use it to repair UPI++Repairing. %% CP mode: If the head is written, then use it to repair UPI++Repairing.

View file

@ -439,6 +439,10 @@ do_pb_hl_request2({high_read_chunk, File, Offset, Size},
#state{high_clnt=Clnt}=S) -> #state{high_clnt=Clnt}=S) ->
Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size), Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size),
{Res, S}; {Res, S};
do_pb_hl_request2({high_trim_chunk, File, Offset, Size},
#state{high_clnt=Clnt}=S) ->
Res = machi_cr_client:trim_chunk(Clnt, File, Offset, Size),
{Res, S};
do_pb_hl_request2({high_checksum_list, File}, #state{high_clnt=Clnt}=S) -> do_pb_hl_request2({high_checksum_list, File}, #state{high_clnt=Clnt}=S) ->
Res = machi_cr_client:checksum_list(Clnt, File), Res = machi_cr_client:checksum_list(Clnt, File),
{Res, S}; {Res, S};

View file

@ -41,6 +41,7 @@
append_chunk/6, append_chunk/7, append_chunk/6, append_chunk/7,
write_chunk/5, write_chunk/6, write_chunk/5, write_chunk/6,
read_chunk/4, read_chunk/5, read_chunk/4, read_chunk/5,
trim_chunk/4, trim_chunk/5,
checksum_list/2, checksum_list/3, checksum_list/2, checksum_list/3,
list_files/1, list_files/2 list_files/1, list_files/2
]). ]).
@ -99,6 +100,12 @@ read_chunk(PidSpec, File, Offset, Size) ->
read_chunk(PidSpec, File, Offset, Size, Timeout) -> read_chunk(PidSpec, File, Offset, Size, Timeout) ->
send_sync(PidSpec, {read_chunk, File, Offset, Size}, Timeout). send_sync(PidSpec, {read_chunk, File, Offset, Size}, Timeout).
trim_chunk(PidSpec, File, Offset, Size) ->
trim_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT).
trim_chunk(PidSpec, File, Offset, Size, Timeout) ->
send_sync(PidSpec, {trim_chunk, File, Offset, Size}, Timeout).
checksum_list(PidSpec, File) -> checksum_list(PidSpec, File) ->
checksum_list(PidSpec, File, ?DEFAULT_TIMEOUT). checksum_list(PidSpec, File, ?DEFAULT_TIMEOUT).
@ -298,6 +305,30 @@ do_send_sync2({read_chunk, File, Offset, Size},
Res = {bummer, {X, Y, erlang:get_stacktrace()}}, Res = {bummer, {X, Y, erlang:get_stacktrace()}},
{Res, S#state{count=Count+1}} {Res, S#state{count=Count+1}}
end; end;
do_send_sync2({trim_chunk, File, Offset, Size},
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
try
ReqID = <<Index:64/big, Count:64/big>>,
Req = #mpb_trimchunkreq{file=File,
offset=Offset,
size=Size},
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
trim_chunk=Req},
Bin1a = machi_pb:encode_mpb_request(R1a),
ok = gen_tcp:send(Sock, Bin1a),
{ok, Bin1B} = gen_tcp:recv(Sock, 0),
case (catch machi_pb:decode_mpb_response(Bin1B)) of
#mpb_response{req_id=ReqID, trim_chunk=R} when R /= undefined ->
Result = convert_trim_chunk_resp(R),
{Result, S#state{count=Count+1}};
#mpb_response{req_id=ReqID, generic=G} when G /= undefined ->
#mpb_errorresp{code=Code, msg=Msg, extra=Extra} = G,
{{error, {Code, Msg, Extra}}, S#state{count=Count+1}}
end
catch X:Y ->
Res = {bummer, {X, Y, erlang:get_stacktrace()}},
{Res, S#state{count=Count+1}}
end;
do_send_sync2({checksum_list, File}, do_send_sync2({checksum_list, File},
#state{sock=Sock, sock_id=Index, count=Count}=S) -> #state{sock=Sock, sock_id=Index, count=Count}=S) ->
try try
@ -389,6 +420,11 @@ convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunk=Chunk}) ->
convert_read_chunk_resp(#mpb_readchunkresp{status=Status}) -> convert_read_chunk_resp(#mpb_readchunkresp{status=Status}) ->
convert_general_status_code(Status). convert_general_status_code(Status).
convert_trim_chunk_resp(#mpb_trimchunkresp{status='OK'}) ->
ok;
convert_trim_chunk_resp(#mpb_trimchunkresp{status=Status}) ->
convert_general_status_code(Status).
convert_checksum_list_resp(#mpb_checksumlistresp{status='OK', chunk=Chunk}) -> convert_checksum_list_resp(#mpb_checksumlistresp{status='OK', chunk=Chunk}) ->
{ok, Chunk}; {ok, Chunk};
convert_checksum_list_resp(#mpb_checksumlistresp{status=Status}) -> convert_checksum_list_resp(#mpb_checksumlistresp{status=Status}) ->

View file

@ -175,6 +175,12 @@ from_pb_request(#mpb_request{req_id=ReqID,
offset=Offset, offset=Offset,
size=Size} = IR, size=Size} = IR,
{ReqID, {high_read_chunk, File, Offset, Size}}; {ReqID, {high_read_chunk, File, Offset, Size}};
from_pb_request(#mpb_request{req_id=ReqID,
trim_chunk=IR=#mpb_trimchunkreq{}}) ->
#mpb_trimchunkreq{file=File,
offset=Offset,
size=Size} = IR,
{ReqID, {high_trim_chunk, File, Offset, Size}};
from_pb_request(#mpb_request{req_id=ReqID, from_pb_request(#mpb_request{req_id=ReqID,
checksum_list=IR=#mpb_checksumlistreq{}}) -> checksum_list=IR=#mpb_checksumlistreq{}}) ->
#mpb_checksumlistreq{file=File} = IR, #mpb_checksumlistreq{file=File} = IR,
@ -643,6 +649,18 @@ to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) ->
_Else -> _Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else])) make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end; end;
to_pb_response(ReqID, {high_trim_chunk, _File, _Offset, _Size}, Resp) ->
case Resp of
ok ->
#mpb_response{req_id=ReqID,
trim_chunk=#mpb_trimchunkresp{status='OK'}};
{error, _}=Error ->
Status = conv_from_status(Error),
#mpb_response{req_id=ReqID,
trim_chunk=#mpb_trimchunkresp{status=Status}};
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb_response(ReqID, {high_checksum_list, _File}, Resp) -> to_pb_response(ReqID, {high_checksum_list, _File}, Resp) ->
case Resp of case Resp of
{ok, Chunk} -> {ok, Chunk} ->

View file

@ -88,6 +88,17 @@ smoke_test2() ->
{ok, [{File1Size,File1}]} = ?C:list_files(Clnt), {ok, [{File1Size,File1}]} = ?C:list_files(Clnt),
true = is_integer(File1Size), true = is_integer(File1Size),
[begin
%% ok = ?C:trim_chunk(Clnt, Fl, Off, Sz)
%% This gets an error as trim API is still a stub
?assertMatch({bummer,
{throw,
{error, bad_joss_taipan_fixme},
_Boring_stack_trace}},
?C:trim_chunk(Clnt, Fl, Off, Sz))
end || {Ch, Fl, Off, Sz} <- Reads],
?debugVal(?C:list_files(Clnt)),
ok ok
after after
(catch ?C:quit(Clnt)) (catch ?C:quit(Clnt))