Allow read_chunk() to return partial chunks #18

Merged
kuenishi merged 2 commits from ku/read-all-valid-chunks into master 2015-10-19 07:25:43 +00:00
15 changed files with 197 additions and 129 deletions

View file

@ -9,7 +9,7 @@
{protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", {tag, "0.8.1p4"}}}, {protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", {tag, "0.8.1p4"}}},
{riak_dt, ".*", {git, "git://github.com/basho/riak_dt.git", {branch, "develop"}}}, {riak_dt, ".*", {git, "git://github.com/basho/riak_dt.git", {branch, "develop"}}},
{node_package, ".*", {git, "git://github.com/basho/node_package.git", {branch, "develop"}}}, {node_package, ".*", {git, "git://github.com/basho/node_package.git", {branch, "develop"}}},
{eper, ".*", {git, "git://github.com/basho/eper.git", {tag, "0.78"}}} {eper, ".*", {git, "git://github.com/basho/eper.git", {tag, "0.92-basho1"}}}
]}. ]}.
{sub_dirs, ["rel", "apps/machi"]}. {sub_dirs, ["rel", "apps/machi"]}.

View file

@ -86,6 +86,14 @@ message Mpb_ChunkCSum {
optional bytes csum = 2; optional bytes csum = 2;
} }
message Mpb_Chunk {
required uint64 offset = 1;
required string file_name = 2;
required bytes chunk = 3;
// TODO: must be required, in future?
optional Mpb_ChunkCSum csum = 4;
}
// epoch_id() type // epoch_id() type
message Mpb_EpochID { message Mpb_EpochID {
required uint32 epoch_number = 1; required uint32 epoch_number = 1;
@ -177,10 +185,7 @@ message Mpb_AppendChunkResp {
// High level API: write_chunk() request & response // High level API: write_chunk() request & response
message Mpb_WriteChunkReq { message Mpb_WriteChunkReq {
required string file = 1; required Mpb_Chunk chunk = 1;
required uint64 offset = 2;
required bytes chunk = 3;
required Mpb_ChunkCSum csum = 4;
} }
message Mpb_WriteChunkResp { message Mpb_WriteChunkResp {
@ -190,33 +195,27 @@ message Mpb_WriteChunkResp {
// High level API: read_chunk() request & response // High level API: read_chunk() request & response
message Mpb_ReadChunkReq { message Mpb_ReadChunkReq {
required string file = 1; required Mpb_ChunkPos chunk_pos = 1;
required uint64 offset = 2;
required uint32 size = 3;
// Use flag_no_checksum=non-zero to skip returning the chunk's checksum. // Use flag_no_checksum=non-zero to skip returning the chunk's checksum.
// TODO: not implemented yet. // TODO: not implemented yet.
optional uint32 flag_no_checksum = 4 [default=0]; optional uint32 flag_no_checksum = 2 [default=0];
// Use flag_no_chunk=non-zero to skip returning the chunk (which // Use flag_no_chunk=non-zero to skip returning the chunk (which
// only makes sense if flag_no_checksum is not set). // only makes sense if flag_no_checksum is not set).
// TODO: not implemented yet. // TODO: not implemented yet.
optional uint32 flag_no_chunk = 5 [default=0]; optional uint32 flag_no_chunk = 3 [default=0];
} }
message Mpb_ReadChunkResp { message Mpb_ReadChunkResp {
required Mpb_GeneralStatusCode status = 1; required Mpb_GeneralStatusCode status = 1;
optional bytes chunk = 2; repeated Mpb_Chunk chunks = 2;
optional Mpb_ChunkCSum csum = 3;
} }
// High level API: trim_chunk() request & response // High level API: trim_chunk() request & response
message Mpb_TrimChunkReq { message Mpb_TrimChunkReq {
required string file = 1; required Mpb_ChunkPos chunk_pos = 1;
required uint64 offset = 2;
required uint32 size = 3;
} }
message Mpb_TrimChunkResp { message Mpb_TrimChunkResp {
@ -390,10 +389,7 @@ message Mpb_LL_AppendChunkResp {
message Mpb_LL_WriteChunkReq { message Mpb_LL_WriteChunkReq {
required Mpb_EpochID epoch_id = 1; required Mpb_EpochID epoch_id = 1;
required string file = 2; required Mpb_Chunk chunk = 2;
required uint64 offset = 3;
required bytes chunk = 4;
required Mpb_ChunkCSum csum = 5;
} }
message Mpb_LL_WriteChunkResp { message Mpb_LL_WriteChunkResp {
@ -404,24 +400,21 @@ message Mpb_LL_WriteChunkResp {
message Mpb_LL_ReadChunkReq { message Mpb_LL_ReadChunkReq {
required Mpb_EpochID epoch_id = 1; required Mpb_EpochID epoch_id = 1;
required string file = 2; required Mpb_ChunkPos chunk_pos = 2;
required uint64 offset = 3;
required uint32 size = 4;
// Use flag_no_checksum=non-zero to skip returning the chunk's checksum. // Use flag_no_checksum=non-zero to skip returning the chunk's checksum.
// TODO: not implemented yet. // TODO: not implemented yet.
optional uint32 flag_no_checksum = 5 [default=0]; optional uint32 flag_no_checksum = 3 [default=0];
// Use flag_no_chunk=non-zero to skip returning the chunk (which // Use flag_no_chunk=non-zero to skip returning the chunk (which
// only makes sense if flag_checksum is not set). // only makes sense if flag_checksum is not set).
// TODO: not implemented yet. // TODO: not implemented yet.
optional uint32 flag_no_chunk = 6 [default=0]; optional uint32 flag_no_chunk = 4 [default=0];
} }
message Mpb_LL_ReadChunkResp { message Mpb_LL_ReadChunkResp {
required Mpb_GeneralStatusCode status = 1; required Mpb_GeneralStatusCode status = 1;
optional bytes chunk = 2; repeated Mpb_Chunk chunks = 2;
optional Mpb_ChunkCSum csum = 3;
} }
// Low level API: checksum_list() // Low level API: checksum_list()

View file

@ -73,8 +73,13 @@ verify_file_checksums_local2(Sock1, EpochID, Path0) ->
{ok, FH} -> {ok, FH} ->
File = re:replace(Path, ".*/", "", [{return, binary}]), File = re:replace(Path, ".*/", "", [{return, binary}]),
try try
ReadChunk = fun(_File, Offset, Size) -> ReadChunk = fun(F, Offset, Size) ->
file:pread(FH, Offset, Size) case file:pread(FH, Offset, Size) of
{ok, Bin} ->
{ok, [{F, Offset, Bin, undefined}]};
Err ->
Err
end
end, end,
verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) verify_file_checksums_common(Sock1, EpochID, File, ReadChunk)
after after
@ -112,7 +117,7 @@ verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) ->
verify_chunk_checksum(File, ReadChunk) -> verify_chunk_checksum(File, ReadChunk) ->
fun({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) -> fun({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) ->
case ReadChunk(File, Offset, Size) of case ReadChunk(File, Offset, Size) of
{ok, Chunk} -> {ok, [{_, Offset, Chunk, _}]} ->
CSum2 = machi_util:checksum_chunk(Chunk), CSum2 = machi_util:checksum_chunk(Chunk),
if CSum == CSum2 -> if CSum == CSum2 ->
Acc; Acc;

View file

@ -323,9 +323,10 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) ->
_ -> ok _ -> ok
end, end,
_T1 = os:timestamp(), _T1 = os:timestamp(),
{ok, Chunk} = machi_proxy_flu1_client:read_chunk( {ok, [{_, Offset, Chunk, _}]} =
SrcP, EpochID, File, Offset, Size, machi_proxy_flu1_client:read_chunk(
?SHORT_TIMEOUT), SrcP, EpochID, File, Offset, Size,
?SHORT_TIMEOUT),
_T2 = os:timestamp(), _T2 = os:timestamp(),
<<_Tag:1/binary, CSum/binary>> = TaggedCSum, <<_Tag:1/binary, CSum/binary>> = TaggedCSum,
case machi_util:checksum_chunk(Chunk) of case machi_util:checksum_chunk(Chunk) of

View file

@ -529,9 +529,12 @@ do_read_chunk2(File, Offset, Size, Depth, STime, TO,
ConsistencyMode = P#projection_v1.mode, ConsistencyMode = P#projection_v1.mode,
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
File, Offset, Size, ?TIMEOUT) of File, Offset, Size, ?TIMEOUT) of
{ok, Chunk} when byte_size(Chunk) == Size -> {ok, [{_, _, Chunk, _Csum}] = Chunks} when byte_size(Chunk) == Size ->
{reply, {ok, Chunk}, S}; {reply, {ok, Chunks}, S};
{ok, Chunks} when is_list(Chunks) ->
{reply, {ok, Chunks}, S};
{ok, BadChunk} -> {ok, BadChunk} ->
%% TODO cleaner handling of bad chunks
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size,
got, byte_size(BadChunk)}); got, byte_size(BadChunk)});
{error, bad_arg} = BadArg -> {error, bad_arg} = BadArg ->
@ -596,7 +599,7 @@ read_repair2(cp_mode=ConsistencyMode,
Tail = lists:last(readonly_flus(P)), Tail = lists:last(readonly_flus(P)),
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
File, Offset, Size, ?TIMEOUT) of File, Offset, Size, ?TIMEOUT) of
{ok, Chunk} when byte_size(Chunk) == Size -> {ok, [{_, Offset, Chunk, _}]} when byte_size(Chunk) == Size ->
ToRepair = mutation_flus(P) -- [Tail], ToRepair = mutation_flus(P) -- [Tail],
read_repair3(ToRepair, ReturnMode, Chunk, [Tail], File, Offset, read_repair3(ToRepair, ReturnMode, Chunk, [Tail], File, Offset,
Size, Depth, STime, S); Size, Depth, STime, S);
@ -620,10 +623,23 @@ read_repair2(ap_mode=ConsistencyMode,
#state{proj=P}=S) -> #state{proj=P}=S) ->
Eligible = mutation_flus(P), Eligible = mutation_flus(P),
case try_to_find_chunk(Eligible, File, Offset, Size, S) of case try_to_find_chunk(Eligible, File, Offset, Size, S) of
{ok, Chunk, GotItFrom} when byte_size(Chunk) == Size -> {ok, [{File0,Offset0,Chunk0,Csum}], GotItFrom} when byte_size(Chunk0) == Size ->
ToRepair = mutation_flus(P) -- [GotItFrom], ToRepair = mutation_flus(P) -- [GotItFrom],
read_repair3(ToRepair, ReturnMode, Chunk, [GotItFrom], File, %% TODO: stop matching single-size list
Offset, Size, Depth, STime, S); %% {RepairedChunks, S2} =
%% lists:foldl(fun({_, Offset0, Chunk0, Csum}, {Chunks0, S0}) ->
%% Size0 = byte_size(Chunk0),
%% {reply, {ok, Chunk1}, S1} =
%% read_repair3(ToRepair, ReturnMode, Chunk0, [GotItFrom], File,
%% Offset0, Size0, Depth, STime, S0),
%% {[{File, Offset0, Chunk1, Csum}|Chunks0], S1}
%% end,
%% {[], S}, Chunks),
%% {reply, {ok, RepairedChunks}, S2};
{reply, {ok, RepairedChunk}, S2}
= read_repair3(ToRepair, ReturnMode, Chunk0, [GotItFrom], File0,
Offset0, Size, Depth, STime, S),
{reply, {ok, [{File0, Offset0, RepairedChunk, Csum}]}, S2};
{ok, BadChunk, _GotItFrom} -> {ok, BadChunk, _GotItFrom} ->
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, exit({todo, bad_chunk_size, ?MODULE, ?LINE, File,
Offset, Size, got, byte_size(BadChunk)}); Offset, Size, got, byte_size(BadChunk)});
@ -856,14 +872,14 @@ try_to_find_chunk(Eligible, File, Offset, Size,
Proxy = orddict:fetch(FLU, PD), Proxy = orddict:fetch(FLU, PD),
case ?FLU_PC:read_chunk(Proxy, EpochID, case ?FLU_PC:read_chunk(Proxy, EpochID,
File, Offset, Size) of File, Offset, Size) of
{ok, Chunk} when byte_size(Chunk) == Size -> {ok, [{_, Offset, Chunk, _}] = Chunks} when byte_size(Chunk) == Size ->
{FLU, {ok, Chunk}}; {FLU, {ok, Chunks}};
Else -> Else ->
{FLU, Else} {FLU, Else}
end end
end, end,
Rs = run_middleworker_job(Work, Eligible, Timeout), Rs = run_middleworker_job(Work, Eligible, Timeout),
case [X || {_, {ok, B}}=X <- Rs, is_binary(B)] of case [X || {_, {ok, [{_,_,B,_}]}}=X <- Rs, is_binary(B)] of
[{FoundFLU, {ok, Chunk}}|_] -> [{FoundFLU, {ok, Chunk}}|_] ->
{ok, Chunk, FoundFLU}; {ok, Chunk, FoundFLU};
[] -> [] ->

View file

@ -132,8 +132,10 @@ sync(_Pid, Type) ->
% @doc Read file at offset for length % @doc Read file at offset for length
-spec read(Pid :: pid(), -spec read(Pid :: pid(),
Offset :: non_neg_integer(), Offset :: non_neg_integer(),
Length :: non_neg_integer()) -> {ok, Data :: binary(), Checksum :: binary()} | Length :: non_neg_integer()) ->
{error, Reason :: term()}. {ok, [{Filename::string(), Offset :: non_neg_integer(),
Data :: binary(), Checksum :: binary()}]} |
{error, Reason :: term()}.
read(Pid, Offset, Length) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0 read(Pid, Offset, Length) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
andalso is_integer(Length) andalso Length > 0 -> andalso is_integer(Length) andalso Length > 0 ->
gen_server:call(Pid, {read, Offset, Length}, ?TIMEOUT); gen_server:call(Pid, {read, Offset, Length}, ?TIMEOUT);
@ -279,14 +281,19 @@ handle_call({read, Offset, Length}, _From,
undefined undefined
end, end,
{Resp, NewErr} = case handle_read(FH, F, Checksum, Offset, Length, U) of {Resp, NewErr} =
{ok, Bytes, Csum} -> case handle_read(FH, F, Checksum, Offset, Length, U) of
{{ok, Bytes, Csum}, Err}; {ok, Chunks} ->
eof -> %% Kludge to wrap read result in tuples, to support fragmented read
{{error, not_written}, Err + 1}; %% XXX FIXME
Error -> %% For now we are omiting the checksum data because it blows up
{Error, Err + 1} %% protobufs.
end, {{ok, Chunks}, Err};
eof ->
{{error, not_written}, Err + 1};
Error ->
{Error, Err + 1}
end,
{reply, Resp, State#state{reads = {T+1, NewErr}}}; {reply, Resp, State#state{reads = {T+1, NewErr}}};
%%% WRITES %%% WRITES
@ -552,11 +559,11 @@ do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
[Bad, Ck]), [Bad, Ck]),
{error, bad_checksum}; {error, bad_checksum};
TaggedCsum -> TaggedCsum ->
{ok, Bytes, TaggedCsum}; {ok, [{Filename, Offset, Bytes, TaggedCsum}]};
%% XXX FIXME: Should we return something other than
%% {ok, ....} in this case?
OtherCsum when Tag =:= ?CSUM_TAG_NONE -> OtherCsum when Tag =:= ?CSUM_TAG_NONE ->
{ok, Bytes, OtherCsum} %% XXX FIXME: Should we return something other than
%% {ok, ....} in this case?
{ok, [{Filename, Offset, Bytes, OtherCsum}]}
end; end;
{ok, Partial} -> {ok, Partial} ->
lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p", lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p",
@ -608,7 +615,7 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data, U) ->
lager:warning("This should never happen: got eof while reading at offset ~p in file ~p that's supposedly written", lager:warning("This should never happen: got eof while reading at offset ~p in file ~p that's supposedly written",
[Offset, Filename]), [Offset, Filename]),
{error, server_insanity}; {error, server_insanity};
{ok, _, _} -> {ok, _} ->
{ok, U}; {ok, U};
_ -> _ ->
{error, written} {error, written}

View file

@ -534,10 +534,7 @@ do_server_read_chunk(File, Offset, Size, _Opts, #state{flu_name=FluName})->
ok -> ok ->
{ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}), {ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}),
case machi_file_proxy:read(Pid, Offset, Size) of case machi_file_proxy:read(Pid, Offset, Size) of
%% XXX FIXME {ok, Chunks} -> {ok, Chunks};
%% For now we are omiting the checksum data because it blows up
%% protobufs.
{ok, Data, _Csum} -> {ok, Data};
Other -> Other Other -> Other
end; end;
_ -> _ ->

View file

@ -260,10 +260,11 @@ do_send_sync2({write_chunk, File, Offset, Chunk, CSum},
try try
ReqID = <<Index:64/big, Count:64/big>>, ReqID = <<Index:64/big, Count:64/big>>,
CSumT = convert_csum_req(CSum, Chunk), CSumT = convert_csum_req(CSum, Chunk),
Req = #mpb_writechunkreq{file=File, Req = #mpb_writechunkreq{chunk=
offset=Offset, #mpb_chunk{chunk=Chunk,
chunk=Chunk, file_name=File,
csum=CSumT}, offset=Offset,
csum=CSumT}},
R1a = #mpb_request{req_id=ReqID, do_not_alter=1, R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
write_chunk=Req}, write_chunk=Req},
Bin1a = machi_pb:encode_mpb_request(R1a), Bin1a = machi_pb:encode_mpb_request(R1a),
@ -285,9 +286,9 @@ do_send_sync2({read_chunk, File, Offset, Size},
#state{sock=Sock, sock_id=Index, count=Count}=S) -> #state{sock=Sock, sock_id=Index, count=Count}=S) ->
try try
ReqID = <<Index:64/big, Count:64/big>>, ReqID = <<Index:64/big, Count:64/big>>,
Req = #mpb_readchunkreq{file=File, Req = #mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
offset=Offset, offset=Offset,
size=Size}, chunk_size=Size}},
R1a = #mpb_request{req_id=ReqID, do_not_alter=1, R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
read_chunk=Req}, read_chunk=Req},
Bin1a = machi_pb:encode_mpb_request(R1a), Bin1a = machi_pb:encode_mpb_request(R1a),
@ -309,9 +310,9 @@ do_send_sync2({trim_chunk, File, Offset, Size},
#state{sock=Sock, sock_id=Index, count=Count}=S) -> #state{sock=Sock, sock_id=Index, count=Count}=S) ->
try try
ReqID = <<Index:64/big, Count:64/big>>, ReqID = <<Index:64/big, Count:64/big>>,
Req = #mpb_trimchunkreq{file=File, Req = #mpb_trimchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
offset=Offset, offset=Offset,
size=Size}, chunk_size=Size}},
R1a = #mpb_request{req_id=ReqID, do_not_alter=1, R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
trim_chunk=Req}, trim_chunk=Req},
Bin1a = machi_pb:encode_mpb_request(R1a), Bin1a = machi_pb:encode_mpb_request(R1a),
@ -415,8 +416,16 @@ convert_write_chunk_resp(#mpb_writechunkresp{status='OK'}) ->
convert_write_chunk_resp(#mpb_writechunkresp{status=Status}) -> convert_write_chunk_resp(#mpb_writechunkresp{status=Status}) ->
convert_general_status_code(Status). convert_general_status_code(Status).
convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunk=Chunk}) -> convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunks=PB_Chunks}) ->
{ok, Chunk}; Chunks = lists:map(fun(#mpb_chunk{offset=Offset,
file_name=File,
chunk=Chunk,
csum=#mpb_chunkcsum{type=T, csum=Ck}}) ->
%% TODO: cleanup export
Csum = <<(machi_pb_translate:conv_to_csum_tag(T)):8, Ck/binary>>,
{File, Offset, Chunk, Csum}
end, PB_Chunks),
{ok, Chunks};
convert_read_chunk_resp(#mpb_readchunkresp{status=Status}) -> convert_read_chunk_resp(#mpb_readchunkresp{status=Status}) ->
convert_general_status_code(Status). convert_general_status_code(Status).

View file

@ -15,8 +15,8 @@
%% KIND, either express or implied. See the License for the %% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations %% specific language governing permissions and limitations
%% under the License. %% under the License.
%%
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%%
-module(machi_pb_translate). -module(machi_pb_translate).
@ -37,6 +37,9 @@
to_pb_response/3 to_pb_response/3
]). ]).
%% TODO: fixme cleanup
-export([conv_to_csum_tag/1]).
from_pb_request(#mpb_ll_request{ from_pb_request(#mpb_ll_request{
req_id=ReqID, req_id=ReqID,
echo=#mpb_echoreq{message=Msg}}) -> echo=#mpb_echoreq{message=Msg}}) ->
@ -62,10 +65,10 @@ from_pb_request(#mpb_ll_request{
req_id=ReqID, req_id=ReqID,
write_chunk=#mpb_ll_writechunkreq{ write_chunk=#mpb_ll_writechunkreq{
epoch_id=PB_EpochID, epoch_id=PB_EpochID,
file=File, chunk=#mpb_chunk{file_name=File,
offset=Offset, offset=Offset,
chunk=Chunk, chunk=Chunk,
csum=#mpb_chunkcsum{type=CSum_type, csum=CSum}}}) -> csum=#mpb_chunkcsum{type=CSum_type, csum=CSum}}}}) ->
EpochID = conv_to_epoch_id(PB_EpochID), EpochID = conv_to_epoch_id(PB_EpochID),
CSum_tag = conv_to_csum_tag(CSum_type), CSum_tag = conv_to_csum_tag(CSum_type),
{ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, CSum}}; {ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, CSum}};
@ -73,14 +76,15 @@ from_pb_request(#mpb_ll_request{
req_id=ReqID, req_id=ReqID,
read_chunk=#mpb_ll_readchunkreq{ read_chunk=#mpb_ll_readchunkreq{
epoch_id=PB_EpochID, epoch_id=PB_EpochID,
file=File, chunk_pos=ChunkPos,
offset=Offset,
size=Size,
flag_no_checksum=PB_GetNoChecksum, flag_no_checksum=PB_GetNoChecksum,
flag_no_chunk=PB_GetNoChunk}}) -> flag_no_chunk=PB_GetNoChunk}}) ->
EpochID = conv_to_epoch_id(PB_EpochID), EpochID = conv_to_epoch_id(PB_EpochID),
Opts = [{no_checksum, conv_to_boolean(PB_GetNoChecksum)}, Opts = [{no_checksum, conv_to_boolean(PB_GetNoChecksum)},
{no_chunk, conv_to_boolean(PB_GetNoChunk)}], {no_chunk, conv_to_boolean(PB_GetNoChunk)}],
#mpb_chunkpos{file_name=File,
offset=Offset,
chunk_size=Size} = ChunkPos,
{ReqID, {low_read_chunk, EpochID, File, Offset, Size, Opts}}; {ReqID, {low_read_chunk, EpochID, File, Offset, Size, Opts}};
from_pb_request(#mpb_ll_request{ from_pb_request(#mpb_ll_request{
req_id=ReqID, req_id=ReqID,
@ -163,23 +167,23 @@ from_pb_request(#mpb_request{req_id=ReqID,
ChunkExtra}}; ChunkExtra}};
from_pb_request(#mpb_request{req_id=ReqID, from_pb_request(#mpb_request{req_id=ReqID,
write_chunk=IR=#mpb_writechunkreq{}}) -> write_chunk=IR=#mpb_writechunkreq{}}) ->
#mpb_writechunkreq{file=File, #mpb_writechunkreq{chunk=#mpb_chunk{file_name=File,
offset=Offset, offset=Offset,
chunk=Chunk, chunk=Chunk,
csum=CSum} = IR, csum=CSum}} = IR,
TaggedCSum = make_tagged_csum(CSum, Chunk), TaggedCSum = make_tagged_csum(CSum, Chunk),
{ReqID, {high_write_chunk, File, Offset, Chunk, TaggedCSum}}; {ReqID, {high_write_chunk, File, Offset, Chunk, TaggedCSum}};
from_pb_request(#mpb_request{req_id=ReqID, from_pb_request(#mpb_request{req_id=ReqID,
read_chunk=IR=#mpb_readchunkreq{}}) -> read_chunk=IR=#mpb_readchunkreq{}}) ->
#mpb_readchunkreq{file=File, #mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
offset=Offset, offset=Offset,
size=Size} = IR, chunk_size=Size}} = IR,
{ReqID, {high_read_chunk, File, Offset, Size}}; {ReqID, {high_read_chunk, File, Offset, Size}};
from_pb_request(#mpb_request{req_id=ReqID, from_pb_request(#mpb_request{req_id=ReqID,
trim_chunk=IR=#mpb_trimchunkreq{}}) -> trim_chunk=IR=#mpb_trimchunkreq{}}) ->
#mpb_trimchunkreq{file=File, #mpb_trimchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
offset=Offset, offset=Offset,
size=Size} = IR, chunk_size=Size}} = IR,
{ReqID, {high_trim_chunk, File, Offset, Size}}; {ReqID, {high_trim_chunk, File, Offset, Size}};
from_pb_request(#mpb_request{req_id=ReqID, from_pb_request(#mpb_request{req_id=ReqID,
checksum_list=IR=#mpb_checksumlistreq{}}) -> checksum_list=IR=#mpb_checksumlistreq{}}) ->
@ -229,10 +233,17 @@ from_pb_response(#mpb_ll_response{
from_pb_response(#mpb_ll_response{ from_pb_response(#mpb_ll_response{
req_id=ReqID, req_id=ReqID,
read_chunk=#mpb_ll_readchunkresp{status=Status, read_chunk=#mpb_ll_readchunkresp{status=Status,
chunk=Chunk}}) -> chunks=PB_Chunks}}) ->
case Status of case Status of
'OK' -> 'OK' ->
{ReqID, {ok, Chunk}}; Chunks = lists:map(fun(#mpb_chunk{file_name=File,
offset=Offset,
chunk=Bytes,
csum=#mpb_chunkcsum{type=T,csum=Ck}}) ->
Csum = <<(conv_to_csum_tag(T)):8, Ck/binary>>,
{File, Offset, Bytes, Csum}
end, PB_Chunks),
{ReqID, {ok, Chunks}};
_ -> _ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)} {ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end; end;
@ -367,10 +378,10 @@ to_pb_request(ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, C
#mpb_ll_request{req_id=ReqID, do_not_alter=2, #mpb_ll_request{req_id=ReqID, do_not_alter=2,
write_chunk=#mpb_ll_writechunkreq{ write_chunk=#mpb_ll_writechunkreq{
epoch_id=PB_EpochID, epoch_id=PB_EpochID,
file=File, chunk=#mpb_chunk{file_name=File,
offset=Offset, offset=Offset,
chunk=Chunk, chunk=Chunk,
csum=PB_CSum}}; csum=PB_CSum}}};
to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, _Opts}) -> to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, _Opts}) ->
%% TODO: stop ignoring Opts ^_^ %% TODO: stop ignoring Opts ^_^
PB_EpochID = conv_from_epoch_id(EpochID), PB_EpochID = conv_from_epoch_id(EpochID),
@ -378,9 +389,10 @@ to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, _Opts}) ->
req_id=ReqID, do_not_alter=2, req_id=ReqID, do_not_alter=2,
read_chunk=#mpb_ll_readchunkreq{ read_chunk=#mpb_ll_readchunkreq{
epoch_id=PB_EpochID, epoch_id=PB_EpochID,
file=File, chunk_pos=#mpb_chunkpos{
offset=Offset, file_name=File,
size=Size}}; offset=Offset,
chunk_size=Size}}};
to_pb_request(ReqID, {low_checksum_list, EpochID, File}) -> to_pb_request(ReqID, {low_checksum_list, EpochID, File}) ->
PB_EpochID = conv_from_epoch_id(EpochID), PB_EpochID = conv_from_epoch_id(EpochID),
#mpb_ll_request{req_id=ReqID, do_not_alter=2, #mpb_ll_request{req_id=ReqID, do_not_alter=2,
@ -466,12 +478,18 @@ to_pb_response(ReqID, {low_write_chunk, _EID, _Fl, _Off, _Ch, _CST, _CS},Resp)->
write_chunk=#mpb_ll_writechunkresp{status=Status}}; write_chunk=#mpb_ll_writechunkresp{status=Status}};
to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)-> to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)->
case Resp of case Resp of
{ok, Chunk} -> {ok, Chunks} ->
CSum = undefined, % TODO not implemented PB_Chunks = lists:map(fun({File, Offset, Bytes, Csum}) ->
{Tag, Ck} = machi_util:unmake_tagged_csum(Csum),
#mpb_chunk{file_name=File,
offset=Offset,
chunk=Bytes,
csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag),
csum=Ck}}
end, Chunks),
#mpb_ll_response{req_id=ReqID, #mpb_ll_response{req_id=ReqID,
read_chunk=#mpb_ll_readchunkresp{status='OK', read_chunk=#mpb_ll_readchunkresp{status='OK',
chunk=Chunk, chunks=PB_Chunks}};
csum=CSum}};
{error, _}=Error -> {error, _}=Error ->
Status = conv_from_status(Error), Status = conv_from_status(Error),
#mpb_ll_response{req_id=ReqID, #mpb_ll_response{req_id=ReqID,
@ -638,10 +656,19 @@ to_pb_response(ReqID, {high_write_chunk, _File, _Offset, _Chunk, _TaggedCSum}, R
end; end;
to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) -> to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) ->
case Resp of case Resp of
{ok, Chunk} -> {ok, Chunks} ->
MpbChunks = lists:map(fun({File, Offset, Bytes, Csum}) ->
{Tag, Ck} = machi_util:unmake_tagged_csum(Csum),
io:format(user, "~p oom~n", [Csum]),
#mpb_chunk{
offset=Offset,
file_name=File,
chunk=Bytes,
csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag), csum=Ck}}
end, Chunks),
#mpb_response{req_id=ReqID, #mpb_response{req_id=ReqID,
read_chunk=#mpb_readchunkresp{status='OK', read_chunk=#mpb_readchunkresp{status='OK',
chunk=Chunk}}; chunks=MpbChunks}};
{error, _}=Error -> {error, _}=Error ->
Status = conv_from_status(Error), Status = conv_from_status(Error),
#mpb_response{req_id=ReqID, #mpb_response{req_id=ReqID,

View file

@ -113,12 +113,13 @@ smoke_test2() ->
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1}, Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1},
{error, bad_checksum} = {error, bad_checksum} =
machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs), machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs),
{ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1), {ok, [{_, Off1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
{ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0, {ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0,
private), private),
%% Verify that the client's CR wrote to all of them. %% Verify that the client's CR wrote to all of them.
[{ok, Chunk1} = machi_flu1_client:read_chunk( [{ok, [{_, Off1, Chunk1, _}]} =
Host, PortBase+X, EpochID, File1, Off1, Size1) || machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID, File1, Off1, Size1) ||
X <- [0,1,2] ], X <- [0,1,2] ],
%% Test read repair: Manually write to head, then verify that %% Test read repair: Manually write to head, then verify that
@ -129,8 +130,8 @@ smoke_test2() ->
File1, FooOff1, Size1) || X <- [0,1,2] ], File1, FooOff1, Size1) || X <- [0,1,2] ],
ok = machi_flu1_client:write_chunk(Host, PortBase+0, EpochID, ok = machi_flu1_client:write_chunk(Host, PortBase+0, EpochID,
File1, FooOff1, Chunk1), File1, FooOff1, Chunk1),
{ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, FooOff1, Size1), {ok, [{_, FooOff1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, FooOff1, Size1),
[{X,{ok, Chunk1}} = {X,machi_flu1_client:read_chunk( [{X,{ok, [{_, FooOff1, Chunk1, _}]}} = {X,machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID, Host, PortBase+X, EpochID,
File1, FooOff1, Size1)} || X <- [0,1,2] ], File1, FooOff1, Size1)} || X <- [0,1,2] ],
@ -140,8 +141,8 @@ smoke_test2() ->
Size2 = size(Chunk2), Size2 = size(Chunk2),
ok = machi_flu1_client:write_chunk(Host, PortBase+1, EpochID, ok = machi_flu1_client:write_chunk(Host, PortBase+1, EpochID,
File1, FooOff2, Chunk2), File1, FooOff2, Chunk2),
{ok, Chunk2} = machi_cr_client:read_chunk(C1, File1, FooOff2, Size2), {ok, [{_, FooOff2, Chunk2, _}]} = machi_cr_client:read_chunk(C1, File1, FooOff2, Size2),
[{X,{ok, Chunk2}} = {X,machi_flu1_client:read_chunk( [{X,{ok, [{_, FooOff2, Chunk2, _}]}} = {X,machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID, Host, PortBase+X, EpochID,
File1, FooOff2, Size2)} || X <- [0,1,2] ], File1, FooOff2, Size2)} || X <- [0,1,2] ],
@ -165,7 +166,8 @@ smoke_test2() ->
{ok, {Off10,Size10,File10}} = {ok, {Off10,Size10,File10}} =
machi_cr_client:append_chunk_extra(C1, Prefix, Chunk10, machi_cr_client:append_chunk_extra(C1, Prefix, Chunk10,
Extra10 * Size10), Extra10 * Size10),
{ok, Chunk10} = machi_cr_client:read_chunk(C1, File10, Off10, Size10), {ok, [{_, Off10, Chunk10, _}]} =
machi_cr_client:read_chunk(C1, File10, Off10, Size10),
[begin [begin
Offx = Off10 + (Seq * Size10), Offx = Off10 + (Seq * Size10),
%% TODO: uncomment written/not_written enforcement is available. %% TODO: uncomment written/not_written enforcement is available.
@ -173,8 +175,8 @@ smoke_test2() ->
%% Offx, Size10), %% Offx, Size10),
{ok, {Offx,Size10,File10}} = {ok, {Offx,Size10,File10}} =
machi_cr_client:write_chunk(C1, File10, Offx, Chunk10), machi_cr_client:write_chunk(C1, File10, Offx, Chunk10),
{ok, Chunk10} = machi_cr_client:read_chunk(C1, File10, Offx, {ok, [{_, Offx, Chunk10, _}]} =
Size10) machi_cr_client:read_chunk(C1, File10, Offx, Size10)
end || Seq <- lists:seq(1, Extra10)], end || Seq <- lists:seq(1, Extra10)],
{ok, {Off11,Size11,File11}} = {ok, {Off11,Size11,File11}} =
machi_cr_client:append_chunk(C1, Prefix, Chunk10), machi_cr_client:append_chunk(C1, Prefix, Chunk10),
@ -196,7 +198,8 @@ witness_smoke_test2() ->
{error, {already_started, P1}} -> P1; {error, {already_started, P1}} -> P1;
Other -> error(Other) Other -> error(Other)
end, end,
error_logger:tty(false), %% TODO: I wonder why commenting this out makes this test pass
%% error_logger:tty(true),
try try
Prefix = <<"pre">>, Prefix = <<"pre">>,
Chunk1 = <<"yochunk">>, Chunk1 = <<"yochunk">>,
@ -215,7 +218,7 @@ witness_smoke_test2() ->
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1}, Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1},
{error, bad_checksum} = {error, bad_checksum} =
machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs), machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs),
{ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1), {ok, [{_, Off1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
%% Stop 'b' and let the chain reset. %% Stop 'b' and let the chain reset.
ok = machi_flu_psup:stop_flu_package(b), ok = machi_flu_psup:stop_flu_package(b),
@ -241,7 +244,8 @@ witness_smoke_test2() ->
end, end,
%% Chunk1 is still readable: not affected by wedged witness head. %% Chunk1 is still readable: not affected by wedged witness head.
{ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1), {ok, [{_, Off1, Chunk1, _}]} =
machi_cr_client:read_chunk(C1, File1, Off1, Size1),
%% But because the head is wedged, an append will fail. %% But because the head is wedged, an append will fail.
{error, partition} = {error, partition} =
machi_cr_client:append_chunk(C1, Prefix, Chunk1, 1*1000), machi_cr_client:append_chunk(C1, Prefix, Chunk1, 1*1000),

View file

@ -201,7 +201,12 @@ read_post(S, [_Pid, Off, L], Res) ->
read_next(S, _Res, _Args) -> S. read_next(S, _Res, _Args) -> S.
read(Pid, Offset, Length) -> read(Pid, Offset, Length) ->
machi_file_proxy:read(Pid, Offset, Length). case machi_file_proxy:read(Pid, Offset, Length) of
{ok, [{_, Offset, Data, Csum}]} ->
{ok, Data, Csum};
E ->
E
end.
%% write %% write

View file

@ -92,7 +92,7 @@ machi_file_proxy_test_() ->
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))), ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)), ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)), ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)),
?_assertMatch({ok, _, _}, machi_file_proxy:read(Pid, 1025, 1000)), ?_assertMatch({ok, [{_, _, _, _}]}, machi_file_proxy:read(Pid, 1025, 1000)),
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)), ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)),
?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))), ?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))),
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid)) ?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))

View file

@ -97,8 +97,8 @@ flu_smoke_test() ->
{ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort, {ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH, ?DUMMY_PV1_EPOCH,
Prefix, Chunk1), Prefix, Chunk1),
{ok, Chunk1} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, {ok, [{_, Off1, Chunk1, _}]} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
File1, Off1, Len1), File1, Off1, Len1),
{ok, KludgeBin} = ?FLU_C:checksum_list(Host, TcpPort, {ok, KludgeBin} = ?FLU_C:checksum_list(Host, TcpPort,
?DUMMY_PV1_EPOCH, File1), ?DUMMY_PV1_EPOCH, File1),
true = is_binary(KludgeBin), true = is_binary(KludgeBin),
@ -151,7 +151,7 @@ flu_smoke_test() ->
File2, Off2, Chunk2), File2, Off2, Chunk2),
{error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, {error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
BadFile, Off2, Chunk2), BadFile, Off2, Chunk2),
{ok, Chunk2} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, {ok, [{_, Off2, Chunk2, _}]} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
File2, Off2, Len2), File2, Off2, Len2),
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort, {error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH, ?DUMMY_PV1_EPOCH,

View file

@ -80,7 +80,9 @@ smoke_test2() ->
{iolist_to_binary(Chunk2), File2, Off2, Size2}, {iolist_to_binary(Chunk2), File2, Off2, Size2},
{iolist_to_binary(Chunk3), File3, Off3, Size3}], {iolist_to_binary(Chunk3), File3, Off3, Size3}],
[begin [begin
{ok, Ch} = ?C:read_chunk(Clnt, Fl, Off, Sz) File = binary_to_list(Fl),
?assertMatch({ok, [{File, Off, Ch, _}]},
?C:read_chunk(Clnt, Fl, Off, Sz))
end || {Ch, Fl, Off, Sz} <- Reads], end || {Ch, Fl, Off, Sz} <- Reads],
{ok, KludgeBin} = ?C:checksum_list(Clnt, File1), {ok, KludgeBin} = ?C:checksum_list(Clnt, File1),

View file

@ -60,12 +60,14 @@ api_smoke_test() ->
{ok, {MyOff,MySize,MyFile}} = {ok, {MyOff,MySize,MyFile}} =
?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk, ?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk,
infinity), infinity),
{ok, MyChunk} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize), {ok, [{_, MyOff, MyChunk, _}]} =
?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize),
MyChunk2 = <<"my chunk data, yeah, again">>, MyChunk2 = <<"my chunk data, yeah, again">>,
{ok, {MyOff2,MySize2,MyFile2}} = {ok, {MyOff2,MySize2,MyFile2}} =
?MUT:append_chunk_extra(Prox1, FakeEpoch, Prefix, ?MUT:append_chunk_extra(Prox1, FakeEpoch, Prefix,
MyChunk2, 4242, infinity), MyChunk2, 4242, infinity),
{ok, MyChunk2} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2), {ok, [{_, MyOff2, MyChunk2, _}]} =
?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2),
MyChunk_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, MyChunk}, MyChunk_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, MyChunk},
{error, bad_checksum} = ?MUT:append_chunk(Prox1, FakeEpoch, {error, bad_checksum} = ?MUT:append_chunk(Prox1, FakeEpoch,
Prefix, MyChunk_badcs), Prefix, MyChunk_badcs),
@ -240,7 +242,7 @@ flu_restart_test() ->
(stop) -> ?MUT:append_chunk_extra(Prox1, FakeEpoch, (stop) -> ?MUT:append_chunk_extra(Prox1, FakeEpoch,
<<"prefix">>, Data, 42, infinity) <<"prefix">>, Data, 42, infinity)
end, end,
fun(run) -> {ok, Data} = fun(run) -> {ok, [{_, Off1, Data, _}]} =
?MUT:read_chunk(Prox1, FakeEpoch, ?MUT:read_chunk(Prox1, FakeEpoch,
File1, Off1, Size1), File1, Off1, Size1),
ok; ok;