Add surface of trim chunks to scrub #8
6 changed files with 108 additions and 4 deletions
|
@ -130,6 +130,7 @@ message Mpb_ErrorResp {
|
|||
// append_chunk() : Mpb_AppendChunkReq and Mpb_AppendChunkResp
|
||||
// write_chunk() : Mpb_WriteChunkReq and Mpb_WriteChunkResp
|
||||
// read_chunk() : Mpb_ReadChunkReq and Mpb_ReadChunkResp
|
||||
// trim_chunk() : Mpb_TrimChunkReq and Mpb_TrimChunkResp
|
||||
// checksum_list() : Mpb_ChecksumListReq and Mpb_ChecksumListResp
|
||||
// list_files() : Mpb_ListFilesReq and Mpb_ListFilesResp
|
||||
//
|
||||
|
@ -209,6 +210,19 @@ message Mpb_ReadChunkResp {
|
|||
optional Mpb_ChunkCSum csum = 3;
|
||||
}
|
||||
|
||||
|
||||
// High level API: trim_chunk() request & response
|
||||
|
||||
message Mpb_TrimChunkReq {
|
||||
required string file = 1;
|
||||
required uint64 offset = 2;
|
||||
required uint32 size = 3;
|
||||
}
|
||||
|
||||
message Mpb_TrimChunkResp {
|
||||
required Mpb_GeneralStatusCode status = 1;
|
||||
}
|
||||
|
||||
// High level API: checksum_list() request & response
|
||||
|
||||
message Mpb_ChecksumListReq {
|
||||
|
@ -258,8 +272,9 @@ message Mpb_Request {
|
|||
optional Mpb_AppendChunkReq append_chunk = 112;
|
||||
optional Mpb_WriteChunkReq write_chunk = 113;
|
||||
optional Mpb_ReadChunkReq read_chunk = 114;
|
||||
optional Mpb_ChecksumListReq checksum_list = 115;
|
||||
optional Mpb_ListFilesReq list_files = 116;
|
||||
optional Mpb_TrimChunkReq trim_chunk = 115;
|
||||
optional Mpb_ChecksumListReq checksum_list = 116;
|
||||
optional Mpb_ListFilesReq list_files = 117;
|
||||
}
|
||||
|
||||
message Mpb_Response {
|
||||
|
@ -281,8 +296,9 @@ message Mpb_Response {
|
|||
optional Mpb_AppendChunkResp append_chunk = 12;
|
||||
optional Mpb_WriteChunkResp write_chunk = 13;
|
||||
optional Mpb_ReadChunkResp read_chunk = 14;
|
||||
optional Mpb_ChecksumListResp checksum_list = 15;
|
||||
optional Mpb_ListFilesResp list_files = 16;
|
||||
optional Mpb_TrimChunkResp trim_chunk = 15;
|
||||
optional Mpb_ChecksumListResp checksum_list = 16;
|
||||
optional Mpb_ListFilesResp list_files = 17;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////
|
||||
|
|
|
@ -122,6 +122,7 @@
|
|||
append_chunk_extra/4, append_chunk_extra/5,
|
||||
write_chunk/4, write_chunk/5,
|
||||
read_chunk/4, read_chunk/5,
|
||||
trim_chunk/4, trim_chunk/5,
|
||||
checksum_list/2, checksum_list/3,
|
||||
list_files/1, list_files/2,
|
||||
|
||||
|
@ -209,6 +210,18 @@ read_chunk(PidSpec, File, Offset, Size, Timeout0) ->
|
|||
gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size, TO}},
|
||||
Timeout).
|
||||
|
||||
%% @doc Trim a chunk of data of size `Size' from `File' at `Offset'.
|
||||
|
||||
trim_chunk(PidSpec, File, Offset, Size) ->
|
||||
trim_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT).
|
||||
|
||||
%% @doc Trim a chunk of data of size `Size' from `File' at `Offset'.
|
||||
|
||||
trim_chunk(PidSpec, File, Offset, Size, Timeout0) ->
|
||||
{TO, Timeout} = timeout(Timeout0),
|
||||
gen_server:call(PidSpec, {req, {trim_chunk, File, Offset, Size, TO}},
|
||||
Timeout).
|
||||
|
||||
%% @doc Fetch the list of chunk checksums for `File'.
|
||||
|
||||
checksum_list(PidSpec, File) ->
|
||||
|
@ -276,6 +289,8 @@ handle_call2({write_chunk, File, Offset, Chunk, TO}, _From, S) ->
|
|||
do_write_head(File, Offset, Chunk, 0, os:timestamp(), TO, S);
|
||||
handle_call2({read_chunk, File, Offset, Size, TO}, _From, S) ->
|
||||
do_read_chunk(File, Offset, Size, 0, os:timestamp(), TO, S);
|
||||
handle_call2({trim_chunk, File, Offset, Size, TO}, _From, S) ->
|
||||
do_trim_chunk(File, Offset, Size, 0, os:timestamp(), TO, S);
|
||||
handle_call2({checksum_list, File, TO}, _From, S) ->
|
||||
do_checksum_list(File, 0, os:timestamp(), TO, S);
|
||||
handle_call2({list_files, TO}, _From, S) ->
|
||||
|
@ -534,6 +549,10 @@ do_read_chunk2(File, Offset, Size, Depth, STime, TO,
|
|||
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size})
|
||||
end.
|
||||
|
||||
do_trim_chunk(_File, _Offset, _Size, _Depth, _STime, _TO, S) ->
|
||||
%% This is just a stub to reach CR client from high level client
|
||||
{reply, {error, bad_joss}, S}.
|
||||
|
||||
%% Read repair: depends on the consistency mode that we're in:
|
||||
%%
|
||||
%% CP mode: If the head is written, then use it to repair UPI++Repairing.
|
||||
|
|
|
@ -439,6 +439,10 @@ do_pb_hl_request2({high_read_chunk, File, Offset, Size},
|
|||
#state{high_clnt=Clnt}=S) ->
|
||||
Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size),
|
||||
{Res, S};
|
||||
do_pb_hl_request2({high_trim_chunk, File, Offset, Size},
|
||||
#state{high_clnt=Clnt}=S) ->
|
||||
Res = machi_cr_client:trim_chunk(Clnt, File, Offset, Size),
|
||||
{Res, S};
|
||||
do_pb_hl_request2({high_checksum_list, File}, #state{high_clnt=Clnt}=S) ->
|
||||
Res = machi_cr_client:checksum_list(Clnt, File),
|
||||
{Res, S};
|
||||
|
|
|
@ -41,6 +41,7 @@
|
|||
append_chunk/6, append_chunk/7,
|
||||
write_chunk/5, write_chunk/6,
|
||||
read_chunk/4, read_chunk/5,
|
||||
trim_chunk/4, trim_chunk/5,
|
||||
checksum_list/2, checksum_list/3,
|
||||
list_files/1, list_files/2
|
||||
]).
|
||||
|
@ -99,6 +100,12 @@ read_chunk(PidSpec, File, Offset, Size) ->
|
|||
read_chunk(PidSpec, File, Offset, Size, Timeout) ->
|
||||
send_sync(PidSpec, {read_chunk, File, Offset, Size}, Timeout).
|
||||
|
||||
trim_chunk(PidSpec, File, Offset, Size) ->
|
||||
trim_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT).
|
||||
|
||||
trim_chunk(PidSpec, File, Offset, Size, Timeout) ->
|
||||
send_sync(PidSpec, {trim_chunk, File, Offset, Size}, Timeout).
|
||||
|
||||
checksum_list(PidSpec, File) ->
|
||||
checksum_list(PidSpec, File, ?DEFAULT_TIMEOUT).
|
||||
|
||||
|
@ -298,6 +305,30 @@ do_send_sync2({read_chunk, File, Offset, Size},
|
|||
Res = {bummer, {X, Y, erlang:get_stacktrace()}},
|
||||
{Res, S#state{count=Count+1}}
|
||||
end;
|
||||
do_send_sync2({trim_chunk, File, Offset, Size},
|
||||
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
|
||||
try
|
||||
ReqID = <<Index:64/big, Count:64/big>>,
|
||||
Req = #mpb_trimchunkreq{file=File,
|
||||
offset=Offset,
|
||||
size=Size},
|
||||
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
|
||||
trim_chunk=Req},
|
||||
Bin1a = machi_pb:encode_mpb_request(R1a),
|
||||
ok = gen_tcp:send(Sock, Bin1a),
|
||||
{ok, Bin1B} = gen_tcp:recv(Sock, 0),
|
||||
case (catch machi_pb:decode_mpb_response(Bin1B)) of
|
||||
#mpb_response{req_id=ReqID, trim_chunk=R} when R /= undefined ->
|
||||
Result = convert_trim_chunk_resp(R),
|
||||
{Result, S#state{count=Count+1}};
|
||||
#mpb_response{req_id=ReqID, generic=G} when G /= undefined ->
|
||||
#mpb_errorresp{code=Code, msg=Msg, extra=Extra} = G,
|
||||
{{error, {Code, Msg, Extra}}, S#state{count=Count+1}}
|
||||
end
|
||||
catch X:Y ->
|
||||
Res = {bummer, {X, Y, erlang:get_stacktrace()}},
|
||||
{Res, S#state{count=Count+1}}
|
||||
end;
|
||||
do_send_sync2({checksum_list, File},
|
||||
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
|
||||
try
|
||||
|
@ -389,6 +420,11 @@ convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunk=Chunk}) ->
|
|||
convert_read_chunk_resp(#mpb_readchunkresp{status=Status}) ->
|
||||
convert_general_status_code(Status).
|
||||
|
||||
convert_trim_chunk_resp(#mpb_trimchunkresp{status='OK'}) ->
|
||||
ok;
|
||||
convert_trim_chunk_resp(#mpb_trimchunkresp{status=Status}) ->
|
||||
convert_general_status_code(Status).
|
||||
|
||||
convert_checksum_list_resp(#mpb_checksumlistresp{status='OK', chunk=Chunk}) ->
|
||||
{ok, Chunk};
|
||||
convert_checksum_list_resp(#mpb_checksumlistresp{status=Status}) ->
|
||||
|
|
|
@ -175,6 +175,12 @@ from_pb_request(#mpb_request{req_id=ReqID,
|
|||
offset=Offset,
|
||||
size=Size} = IR,
|
||||
{ReqID, {high_read_chunk, File, Offset, Size}};
|
||||
from_pb_request(#mpb_request{req_id=ReqID,
|
||||
trim_chunk=IR=#mpb_trimchunkreq{}}) ->
|
||||
#mpb_trimchunkreq{file=File,
|
||||
offset=Offset,
|
||||
size=Size} = IR,
|
||||
{ReqID, {high_trim_chunk, File, Offset, Size}};
|
||||
from_pb_request(#mpb_request{req_id=ReqID,
|
||||
checksum_list=IR=#mpb_checksumlistreq{}}) ->
|
||||
#mpb_checksumlistreq{file=File} = IR,
|
||||
|
@ -643,6 +649,18 @@ to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) ->
|
|||
_Else ->
|
||||
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
|
||||
end;
|
||||
to_pb_response(ReqID, {high_trim_chunk, _File, _Offset, _Size}, Resp) ->
|
||||
case Resp of
|
||||
ok ->
|
||||
#mpb_response{req_id=ReqID,
|
||||
trim_chunk=#mpb_trimchunkresp{status='OK'}};
|
||||
{error, _}=Error ->
|
||||
Status = conv_from_status(Error),
|
||||
#mpb_response{req_id=ReqID,
|
||||
trim_chunk=#mpb_trimchunkresp{status=Status}};
|
||||
_Else ->
|
||||
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
|
||||
end;
|
||||
to_pb_response(ReqID, {high_checksum_list, _File}, Resp) ->
|
||||
case Resp of
|
||||
{ok, Chunk} ->
|
||||
|
|
|
@ -88,6 +88,17 @@ smoke_test2() ->
|
|||
{ok, [{File1Size,File1}]} = ?C:list_files(Clnt),
|
||||
true = is_integer(File1Size),
|
||||
|
||||
[begin
|
||||
%% ok = ?C:trim_chunk(Clnt, Fl, Off, Sz)
|
||||
%% This gets an error as trim API is still a stub
|
||||
?assertMatch({bummer,
|
||||
{throw,
|
||||
{error, bad_joss_taipan_fixme},
|
||||
_Boring_stack_trace}},
|
||||
?C:trim_chunk(Clnt, Fl, Off, Sz))
|
||||
end || {Ch, Fl, Off, Sz} <- Reads],
|
||||
?debugVal(?C:list_files(Clnt)),
|
||||
|
||||
ok
|
||||
after
|
||||
(catch ?C:quit(Clnt))
|
||||
|
|
Loading…
Reference in a new issue