Allow read_chunk() to return partial chunks #18
15 changed files with 197 additions and 129 deletions
|
@ -9,7 +9,7 @@
|
||||||
{protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", {tag, "0.8.1p4"}}},
|
{protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", {tag, "0.8.1p4"}}},
|
||||||
{riak_dt, ".*", {git, "git://github.com/basho/riak_dt.git", {branch, "develop"}}},
|
{riak_dt, ".*", {git, "git://github.com/basho/riak_dt.git", {branch, "develop"}}},
|
||||||
{node_package, ".*", {git, "git://github.com/basho/node_package.git", {branch, "develop"}}},
|
{node_package, ".*", {git, "git://github.com/basho/node_package.git", {branch, "develop"}}},
|
||||||
{eper, ".*", {git, "git://github.com/basho/eper.git", {tag, "0.78"}}}
|
{eper, ".*", {git, "git://github.com/basho/eper.git", {tag, "0.92-basho1"}}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{sub_dirs, ["rel", "apps/machi"]}.
|
{sub_dirs, ["rel", "apps/machi"]}.
|
||||||
|
|
|
@ -86,6 +86,14 @@ message Mpb_ChunkCSum {
|
||||||
optional bytes csum = 2;
|
optional bytes csum = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message Mpb_Chunk {
|
||||||
|
required uint64 offset = 1;
|
||||||
|
required string file_name = 2;
|
||||||
|
required bytes chunk = 3;
|
||||||
|
// TODO: must be required, in future?
|
||||||
|
optional Mpb_ChunkCSum csum = 4;
|
||||||
|
}
|
||||||
|
|
||||||
// epoch_id() type
|
// epoch_id() type
|
||||||
message Mpb_EpochID {
|
message Mpb_EpochID {
|
||||||
required uint32 epoch_number = 1;
|
required uint32 epoch_number = 1;
|
||||||
|
@ -177,10 +185,7 @@ message Mpb_AppendChunkResp {
|
||||||
// High level API: write_chunk() request & response
|
// High level API: write_chunk() request & response
|
||||||
|
|
||||||
message Mpb_WriteChunkReq {
|
message Mpb_WriteChunkReq {
|
||||||
required string file = 1;
|
required Mpb_Chunk chunk = 1;
|
||||||
required uint64 offset = 2;
|
|
||||||
required bytes chunk = 3;
|
|
||||||
required Mpb_ChunkCSum csum = 4;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message Mpb_WriteChunkResp {
|
message Mpb_WriteChunkResp {
|
||||||
|
@ -190,33 +195,27 @@ message Mpb_WriteChunkResp {
|
||||||
// High level API: read_chunk() request & response
|
// High level API: read_chunk() request & response
|
||||||
|
|
||||||
message Mpb_ReadChunkReq {
|
message Mpb_ReadChunkReq {
|
||||||
required string file = 1;
|
required Mpb_ChunkPos chunk_pos = 1;
|
||||||
required uint64 offset = 2;
|
|
||||||
required uint32 size = 3;
|
|
||||||
|
|
||||||
// Use flag_no_checksum=non-zero to skip returning the chunk's checksum.
|
// Use flag_no_checksum=non-zero to skip returning the chunk's checksum.
|
||||||
// TODO: not implemented yet.
|
// TODO: not implemented yet.
|
||||||
optional uint32 flag_no_checksum = 4 [default=0];
|
optional uint32 flag_no_checksum = 2 [default=0];
|
||||||
|
|
||||||
// Use flag_no_chunk=non-zero to skip returning the chunk (which
|
// Use flag_no_chunk=non-zero to skip returning the chunk (which
|
||||||
// only makes sense if flag_no_checksum is not set).
|
// only makes sense if flag_no_checksum is not set).
|
||||||
// TODO: not implemented yet.
|
// TODO: not implemented yet.
|
||||||
optional uint32 flag_no_chunk = 5 [default=0];
|
optional uint32 flag_no_chunk = 3 [default=0];
|
||||||
}
|
}
|
||||||
|
|
||||||
message Mpb_ReadChunkResp {
|
message Mpb_ReadChunkResp {
|
||||||
required Mpb_GeneralStatusCode status = 1;
|
required Mpb_GeneralStatusCode status = 1;
|
||||||
optional bytes chunk = 2;
|
repeated Mpb_Chunk chunks = 2;
|
||||||
optional Mpb_ChunkCSum csum = 3;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// High level API: trim_chunk() request & response
|
// High level API: trim_chunk() request & response
|
||||||
|
|
||||||
message Mpb_TrimChunkReq {
|
message Mpb_TrimChunkReq {
|
||||||
required string file = 1;
|
required Mpb_ChunkPos chunk_pos = 1;
|
||||||
required uint64 offset = 2;
|
|
||||||
required uint32 size = 3;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message Mpb_TrimChunkResp {
|
message Mpb_TrimChunkResp {
|
||||||
|
@ -390,10 +389,7 @@ message Mpb_LL_AppendChunkResp {
|
||||||
|
|
||||||
message Mpb_LL_WriteChunkReq {
|
message Mpb_LL_WriteChunkReq {
|
||||||
required Mpb_EpochID epoch_id = 1;
|
required Mpb_EpochID epoch_id = 1;
|
||||||
required string file = 2;
|
required Mpb_Chunk chunk = 2;
|
||||||
required uint64 offset = 3;
|
|
||||||
required bytes chunk = 4;
|
|
||||||
required Mpb_ChunkCSum csum = 5;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message Mpb_LL_WriteChunkResp {
|
message Mpb_LL_WriteChunkResp {
|
||||||
|
@ -404,24 +400,21 @@ message Mpb_LL_WriteChunkResp {
|
||||||
|
|
||||||
message Mpb_LL_ReadChunkReq {
|
message Mpb_LL_ReadChunkReq {
|
||||||
required Mpb_EpochID epoch_id = 1;
|
required Mpb_EpochID epoch_id = 1;
|
||||||
required string file = 2;
|
required Mpb_ChunkPos chunk_pos = 2;
|
||||||
required uint64 offset = 3;
|
|
||||||
required uint32 size = 4;
|
|
||||||
|
|
||||||
// Use flag_no_checksum=non-zero to skip returning the chunk's checksum.
|
// Use flag_no_checksum=non-zero to skip returning the chunk's checksum.
|
||||||
// TODO: not implemented yet.
|
// TODO: not implemented yet.
|
||||||
optional uint32 flag_no_checksum = 5 [default=0];
|
optional uint32 flag_no_checksum = 3 [default=0];
|
||||||
|
|
||||||
// Use flag_no_chunk=non-zero to skip returning the chunk (which
|
// Use flag_no_chunk=non-zero to skip returning the chunk (which
|
||||||
// only makes sense if flag_checksum is not set).
|
// only makes sense if flag_checksum is not set).
|
||||||
// TODO: not implemented yet.
|
// TODO: not implemented yet.
|
||||||
optional uint32 flag_no_chunk = 6 [default=0];
|
optional uint32 flag_no_chunk = 4 [default=0];
|
||||||
}
|
}
|
||||||
|
|
||||||
message Mpb_LL_ReadChunkResp {
|
message Mpb_LL_ReadChunkResp {
|
||||||
required Mpb_GeneralStatusCode status = 1;
|
required Mpb_GeneralStatusCode status = 1;
|
||||||
optional bytes chunk = 2;
|
repeated Mpb_Chunk chunks = 2;
|
||||||
optional Mpb_ChunkCSum csum = 3;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Low level API: checksum_list()
|
// Low level API: checksum_list()
|
||||||
|
|
|
@ -73,8 +73,13 @@ verify_file_checksums_local2(Sock1, EpochID, Path0) ->
|
||||||
{ok, FH} ->
|
{ok, FH} ->
|
||||||
File = re:replace(Path, ".*/", "", [{return, binary}]),
|
File = re:replace(Path, ".*/", "", [{return, binary}]),
|
||||||
try
|
try
|
||||||
ReadChunk = fun(_File, Offset, Size) ->
|
ReadChunk = fun(F, Offset, Size) ->
|
||||||
file:pread(FH, Offset, Size)
|
case file:pread(FH, Offset, Size) of
|
||||||
|
{ok, Bin} ->
|
||||||
|
{ok, [{F, Offset, Bin, undefined}]};
|
||||||
|
Err ->
|
||||||
|
Err
|
||||||
|
end
|
||||||
end,
|
end,
|
||||||
verify_file_checksums_common(Sock1, EpochID, File, ReadChunk)
|
verify_file_checksums_common(Sock1, EpochID, File, ReadChunk)
|
||||||
after
|
after
|
||||||
|
@ -112,7 +117,7 @@ verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) ->
|
||||||
verify_chunk_checksum(File, ReadChunk) ->
|
verify_chunk_checksum(File, ReadChunk) ->
|
||||||
fun({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) ->
|
fun({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) ->
|
||||||
case ReadChunk(File, Offset, Size) of
|
case ReadChunk(File, Offset, Size) of
|
||||||
{ok, Chunk} ->
|
{ok, [{_, Offset, Chunk, _}]} ->
|
||||||
CSum2 = machi_util:checksum_chunk(Chunk),
|
CSum2 = machi_util:checksum_chunk(Chunk),
|
||||||
if CSum == CSum2 ->
|
if CSum == CSum2 ->
|
||||||
Acc;
|
Acc;
|
||||||
|
|
|
@ -323,9 +323,10 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) ->
|
||||||
_ -> ok
|
_ -> ok
|
||||||
end,
|
end,
|
||||||
_T1 = os:timestamp(),
|
_T1 = os:timestamp(),
|
||||||
{ok, Chunk} = machi_proxy_flu1_client:read_chunk(
|
{ok, [{_, Offset, Chunk, _}]} =
|
||||||
SrcP, EpochID, File, Offset, Size,
|
machi_proxy_flu1_client:read_chunk(
|
||||||
?SHORT_TIMEOUT),
|
SrcP, EpochID, File, Offset, Size,
|
||||||
|
?SHORT_TIMEOUT),
|
||||||
_T2 = os:timestamp(),
|
_T2 = os:timestamp(),
|
||||||
<<_Tag:1/binary, CSum/binary>> = TaggedCSum,
|
<<_Tag:1/binary, CSum/binary>> = TaggedCSum,
|
||||||
case machi_util:checksum_chunk(Chunk) of
|
case machi_util:checksum_chunk(Chunk) of
|
||||||
|
|
|
@ -529,9 +529,12 @@ do_read_chunk2(File, Offset, Size, Depth, STime, TO,
|
||||||
ConsistencyMode = P#projection_v1.mode,
|
ConsistencyMode = P#projection_v1.mode,
|
||||||
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
|
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
|
||||||
File, Offset, Size, ?TIMEOUT) of
|
File, Offset, Size, ?TIMEOUT) of
|
||||||
{ok, Chunk} when byte_size(Chunk) == Size ->
|
{ok, [{_, _, Chunk, _Csum}] = Chunks} when byte_size(Chunk) == Size ->
|
||||||
{reply, {ok, Chunk}, S};
|
{reply, {ok, Chunks}, S};
|
||||||
|
{ok, Chunks} when is_list(Chunks) ->
|
||||||
|
{reply, {ok, Chunks}, S};
|
||||||
{ok, BadChunk} ->
|
{ok, BadChunk} ->
|
||||||
|
%% TODO cleaner handling of bad chunks
|
||||||
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size,
|
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size,
|
||||||
got, byte_size(BadChunk)});
|
got, byte_size(BadChunk)});
|
||||||
{error, bad_arg} = BadArg ->
|
{error, bad_arg} = BadArg ->
|
||||||
|
@ -596,7 +599,7 @@ read_repair2(cp_mode=ConsistencyMode,
|
||||||
Tail = lists:last(readonly_flus(P)),
|
Tail = lists:last(readonly_flus(P)),
|
||||||
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
|
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
|
||||||
File, Offset, Size, ?TIMEOUT) of
|
File, Offset, Size, ?TIMEOUT) of
|
||||||
{ok, Chunk} when byte_size(Chunk) == Size ->
|
{ok, [{_, Offset, Chunk, _}]} when byte_size(Chunk) == Size ->
|
||||||
ToRepair = mutation_flus(P) -- [Tail],
|
ToRepair = mutation_flus(P) -- [Tail],
|
||||||
read_repair3(ToRepair, ReturnMode, Chunk, [Tail], File, Offset,
|
read_repair3(ToRepair, ReturnMode, Chunk, [Tail], File, Offset,
|
||||||
Size, Depth, STime, S);
|
Size, Depth, STime, S);
|
||||||
|
@ -620,10 +623,23 @@ read_repair2(ap_mode=ConsistencyMode,
|
||||||
#state{proj=P}=S) ->
|
#state{proj=P}=S) ->
|
||||||
Eligible = mutation_flus(P),
|
Eligible = mutation_flus(P),
|
||||||
case try_to_find_chunk(Eligible, File, Offset, Size, S) of
|
case try_to_find_chunk(Eligible, File, Offset, Size, S) of
|
||||||
{ok, Chunk, GotItFrom} when byte_size(Chunk) == Size ->
|
{ok, [{File0,Offset0,Chunk0,Csum}], GotItFrom} when byte_size(Chunk0) == Size ->
|
||||||
ToRepair = mutation_flus(P) -- [GotItFrom],
|
ToRepair = mutation_flus(P) -- [GotItFrom],
|
||||||
read_repair3(ToRepair, ReturnMode, Chunk, [GotItFrom], File,
|
%% TODO: stop matching single-size list
|
||||||
Offset, Size, Depth, STime, S);
|
%% {RepairedChunks, S2} =
|
||||||
|
%% lists:foldl(fun({_, Offset0, Chunk0, Csum}, {Chunks0, S0}) ->
|
||||||
|
%% Size0 = byte_size(Chunk0),
|
||||||
|
%% {reply, {ok, Chunk1}, S1} =
|
||||||
|
%% read_repair3(ToRepair, ReturnMode, Chunk0, [GotItFrom], File,
|
||||||
|
%% Offset0, Size0, Depth, STime, S0),
|
||||||
|
%% {[{File, Offset0, Chunk1, Csum}|Chunks0], S1}
|
||||||
|
%% end,
|
||||||
|
%% {[], S}, Chunks),
|
||||||
|
%% {reply, {ok, RepairedChunks}, S2};
|
||||||
|
{reply, {ok, RepairedChunk}, S2}
|
||||||
|
= read_repair3(ToRepair, ReturnMode, Chunk0, [GotItFrom], File0,
|
||||||
|
Offset0, Size, Depth, STime, S),
|
||||||
|
{reply, {ok, [{File0, Offset0, RepairedChunk, Csum}]}, S2};
|
||||||
{ok, BadChunk, _GotItFrom} ->
|
{ok, BadChunk, _GotItFrom} ->
|
||||||
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File,
|
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File,
|
||||||
Offset, Size, got, byte_size(BadChunk)});
|
Offset, Size, got, byte_size(BadChunk)});
|
||||||
|
@ -856,14 +872,14 @@ try_to_find_chunk(Eligible, File, Offset, Size,
|
||||||
Proxy = orddict:fetch(FLU, PD),
|
Proxy = orddict:fetch(FLU, PD),
|
||||||
case ?FLU_PC:read_chunk(Proxy, EpochID,
|
case ?FLU_PC:read_chunk(Proxy, EpochID,
|
||||||
File, Offset, Size) of
|
File, Offset, Size) of
|
||||||
{ok, Chunk} when byte_size(Chunk) == Size ->
|
{ok, [{_, Offset, Chunk, _}] = Chunks} when byte_size(Chunk) == Size ->
|
||||||
{FLU, {ok, Chunk}};
|
{FLU, {ok, Chunks}};
|
||||||
Else ->
|
Else ->
|
||||||
{FLU, Else}
|
{FLU, Else}
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
Rs = run_middleworker_job(Work, Eligible, Timeout),
|
Rs = run_middleworker_job(Work, Eligible, Timeout),
|
||||||
case [X || {_, {ok, B}}=X <- Rs, is_binary(B)] of
|
case [X || {_, {ok, [{_,_,B,_}]}}=X <- Rs, is_binary(B)] of
|
||||||
[{FoundFLU, {ok, Chunk}}|_] ->
|
[{FoundFLU, {ok, Chunk}}|_] ->
|
||||||
{ok, Chunk, FoundFLU};
|
{ok, Chunk, FoundFLU};
|
||||||
[] ->
|
[] ->
|
||||||
|
|
|
@ -132,8 +132,10 @@ sync(_Pid, Type) ->
|
||||||
% @doc Read file at offset for length
|
% @doc Read file at offset for length
|
||||||
-spec read(Pid :: pid(),
|
-spec read(Pid :: pid(),
|
||||||
Offset :: non_neg_integer(),
|
Offset :: non_neg_integer(),
|
||||||
Length :: non_neg_integer()) -> {ok, Data :: binary(), Checksum :: binary()} |
|
Length :: non_neg_integer()) ->
|
||||||
{error, Reason :: term()}.
|
{ok, [{Filename::string(), Offset :: non_neg_integer(),
|
||||||
|
Data :: binary(), Checksum :: binary()}]} |
|
||||||
|
{error, Reason :: term()}.
|
||||||
read(Pid, Offset, Length) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
|
read(Pid, Offset, Length) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
|
||||||
andalso is_integer(Length) andalso Length > 0 ->
|
andalso is_integer(Length) andalso Length > 0 ->
|
||||||
gen_server:call(Pid, {read, Offset, Length}, ?TIMEOUT);
|
gen_server:call(Pid, {read, Offset, Length}, ?TIMEOUT);
|
||||||
|
@ -279,14 +281,19 @@ handle_call({read, Offset, Length}, _From,
|
||||||
undefined
|
undefined
|
||||||
end,
|
end,
|
||||||
|
|
||||||
{Resp, NewErr} = case handle_read(FH, F, Checksum, Offset, Length, U) of
|
{Resp, NewErr} =
|
||||||
{ok, Bytes, Csum} ->
|
case handle_read(FH, F, Checksum, Offset, Length, U) of
|
||||||
{{ok, Bytes, Csum}, Err};
|
{ok, Chunks} ->
|
||||||
eof ->
|
%% Kludge to wrap read result in tuples, to support fragmented read
|
||||||
{{error, not_written}, Err + 1};
|
%% XXX FIXME
|
||||||
Error ->
|
%% For now we are omiting the checksum data because it blows up
|
||||||
{Error, Err + 1}
|
%% protobufs.
|
||||||
end,
|
{{ok, Chunks}, Err};
|
||||||
|
eof ->
|
||||||
|
{{error, not_written}, Err + 1};
|
||||||
|
Error ->
|
||||||
|
{Error, Err + 1}
|
||||||
|
end,
|
||||||
{reply, Resp, State#state{reads = {T+1, NewErr}}};
|
{reply, Resp, State#state{reads = {T+1, NewErr}}};
|
||||||
|
|
||||||
%%% WRITES
|
%%% WRITES
|
||||||
|
@ -542,7 +549,7 @@ handle_read(FHd, Filename, TaggedCsum, Offset, Size, U) ->
|
||||||
|
|
||||||
do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
|
do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
|
||||||
case file:pread(FHd, Offset, Size) of
|
case file:pread(FHd, Offset, Size) of
|
||||||
eof ->
|
eof ->
|
||||||
eof;
|
eof;
|
||||||
{ok, Bytes} when byte_size(Bytes) == Size ->
|
{ok, Bytes} when byte_size(Bytes) == Size ->
|
||||||
{Tag, Ck} = machi_util:unmake_tagged_csum(TaggedCsum),
|
{Tag, Ck} = machi_util:unmake_tagged_csum(TaggedCsum),
|
||||||
|
@ -552,11 +559,11 @@ do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
|
||||||
[Bad, Ck]),
|
[Bad, Ck]),
|
||||||
{error, bad_checksum};
|
{error, bad_checksum};
|
||||||
TaggedCsum ->
|
TaggedCsum ->
|
||||||
{ok, Bytes, TaggedCsum};
|
{ok, [{Filename, Offset, Bytes, TaggedCsum}]};
|
||||||
%% XXX FIXME: Should we return something other than
|
|
||||||
%% {ok, ....} in this case?
|
|
||||||
OtherCsum when Tag =:= ?CSUM_TAG_NONE ->
|
OtherCsum when Tag =:= ?CSUM_TAG_NONE ->
|
||||||
{ok, Bytes, OtherCsum}
|
%% XXX FIXME: Should we return something other than
|
||||||
|
%% {ok, ....} in this case?
|
||||||
|
{ok, [{Filename, Offset, Bytes, OtherCsum}]}
|
||||||
end;
|
end;
|
||||||
{ok, Partial} ->
|
{ok, Partial} ->
|
||||||
lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p",
|
lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p",
|
||||||
|
@ -608,7 +615,7 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data, U) ->
|
||||||
lager:warning("This should never happen: got eof while reading at offset ~p in file ~p that's supposedly written",
|
lager:warning("This should never happen: got eof while reading at offset ~p in file ~p that's supposedly written",
|
||||||
[Offset, Filename]),
|
[Offset, Filename]),
|
||||||
{error, server_insanity};
|
{error, server_insanity};
|
||||||
{ok, _, _} ->
|
{ok, _} ->
|
||||||
{ok, U};
|
{ok, U};
|
||||||
_ ->
|
_ ->
|
||||||
{error, written}
|
{error, written}
|
||||||
|
|
|
@ -534,10 +534,7 @@ do_server_read_chunk(File, Offset, Size, _Opts, #state{flu_name=FluName})->
|
||||||
ok ->
|
ok ->
|
||||||
{ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}),
|
{ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}),
|
||||||
case machi_file_proxy:read(Pid, Offset, Size) of
|
case machi_file_proxy:read(Pid, Offset, Size) of
|
||||||
%% XXX FIXME
|
{ok, Chunks} -> {ok, Chunks};
|
||||||
%% For now we are omiting the checksum data because it blows up
|
|
||||||
%% protobufs.
|
|
||||||
{ok, Data, _Csum} -> {ok, Data};
|
|
||||||
Other -> Other
|
Other -> Other
|
||||||
end;
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
|
|
|
@ -260,10 +260,11 @@ do_send_sync2({write_chunk, File, Offset, Chunk, CSum},
|
||||||
try
|
try
|
||||||
ReqID = <<Index:64/big, Count:64/big>>,
|
ReqID = <<Index:64/big, Count:64/big>>,
|
||||||
CSumT = convert_csum_req(CSum, Chunk),
|
CSumT = convert_csum_req(CSum, Chunk),
|
||||||
Req = #mpb_writechunkreq{file=File,
|
Req = #mpb_writechunkreq{chunk=
|
||||||
offset=Offset,
|
#mpb_chunk{chunk=Chunk,
|
||||||
chunk=Chunk,
|
file_name=File,
|
||||||
csum=CSumT},
|
offset=Offset,
|
||||||
|
csum=CSumT}},
|
||||||
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
|
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
|
||||||
write_chunk=Req},
|
write_chunk=Req},
|
||||||
Bin1a = machi_pb:encode_mpb_request(R1a),
|
Bin1a = machi_pb:encode_mpb_request(R1a),
|
||||||
|
@ -285,9 +286,9 @@ do_send_sync2({read_chunk, File, Offset, Size},
|
||||||
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
|
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
|
||||||
try
|
try
|
||||||
ReqID = <<Index:64/big, Count:64/big>>,
|
ReqID = <<Index:64/big, Count:64/big>>,
|
||||||
Req = #mpb_readchunkreq{file=File,
|
Req = #mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
|
||||||
offset=Offset,
|
offset=Offset,
|
||||||
size=Size},
|
chunk_size=Size}},
|
||||||
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
|
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
|
||||||
read_chunk=Req},
|
read_chunk=Req},
|
||||||
Bin1a = machi_pb:encode_mpb_request(R1a),
|
Bin1a = machi_pb:encode_mpb_request(R1a),
|
||||||
|
@ -309,9 +310,9 @@ do_send_sync2({trim_chunk, File, Offset, Size},
|
||||||
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
|
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
|
||||||
try
|
try
|
||||||
ReqID = <<Index:64/big, Count:64/big>>,
|
ReqID = <<Index:64/big, Count:64/big>>,
|
||||||
Req = #mpb_trimchunkreq{file=File,
|
Req = #mpb_trimchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
|
||||||
offset=Offset,
|
offset=Offset,
|
||||||
size=Size},
|
chunk_size=Size}},
|
||||||
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
|
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
|
||||||
trim_chunk=Req},
|
trim_chunk=Req},
|
||||||
Bin1a = machi_pb:encode_mpb_request(R1a),
|
Bin1a = machi_pb:encode_mpb_request(R1a),
|
||||||
|
@ -415,8 +416,16 @@ convert_write_chunk_resp(#mpb_writechunkresp{status='OK'}) ->
|
||||||
convert_write_chunk_resp(#mpb_writechunkresp{status=Status}) ->
|
convert_write_chunk_resp(#mpb_writechunkresp{status=Status}) ->
|
||||||
convert_general_status_code(Status).
|
convert_general_status_code(Status).
|
||||||
|
|
||||||
convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunk=Chunk}) ->
|
convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunks=PB_Chunks}) ->
|
||||||
{ok, Chunk};
|
Chunks = lists:map(fun(#mpb_chunk{offset=Offset,
|
||||||
|
file_name=File,
|
||||||
|
chunk=Chunk,
|
||||||
|
csum=#mpb_chunkcsum{type=T, csum=Ck}}) ->
|
||||||
|
%% TODO: cleanup export
|
||||||
|
Csum = <<(machi_pb_translate:conv_to_csum_tag(T)):8, Ck/binary>>,
|
||||||
|
{File, Offset, Chunk, Csum}
|
||||||
|
end, PB_Chunks),
|
||||||
|
{ok, Chunks};
|
||||||
convert_read_chunk_resp(#mpb_readchunkresp{status=Status}) ->
|
convert_read_chunk_resp(#mpb_readchunkresp{status=Status}) ->
|
||||||
convert_general_status_code(Status).
|
convert_general_status_code(Status).
|
||||||
|
|
||||||
|
|
|
@ -15,8 +15,8 @@
|
||||||
%% KIND, either express or implied. See the License for the
|
%% KIND, either express or implied. See the License for the
|
||||||
%% specific language governing permissions and limitations
|
%% specific language governing permissions and limitations
|
||||||
%% under the License.
|
%% under the License.
|
||||||
%%
|
|
||||||
%% -------------------------------------------------------------------
|
%% -------------------------------------------------------------------
|
||||||
|
%%
|
||||||
|
|
||||||
-module(machi_pb_translate).
|
-module(machi_pb_translate).
|
||||||
|
|
||||||
|
@ -37,6 +37,9 @@
|
||||||
to_pb_response/3
|
to_pb_response/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
%% TODO: fixme cleanup
|
||||||
|
-export([conv_to_csum_tag/1]).
|
||||||
|
|
||||||
from_pb_request(#mpb_ll_request{
|
from_pb_request(#mpb_ll_request{
|
||||||
req_id=ReqID,
|
req_id=ReqID,
|
||||||
echo=#mpb_echoreq{message=Msg}}) ->
|
echo=#mpb_echoreq{message=Msg}}) ->
|
||||||
|
@ -62,10 +65,10 @@ from_pb_request(#mpb_ll_request{
|
||||||
req_id=ReqID,
|
req_id=ReqID,
|
||||||
write_chunk=#mpb_ll_writechunkreq{
|
write_chunk=#mpb_ll_writechunkreq{
|
||||||
epoch_id=PB_EpochID,
|
epoch_id=PB_EpochID,
|
||||||
file=File,
|
chunk=#mpb_chunk{file_name=File,
|
||||||
offset=Offset,
|
offset=Offset,
|
||||||
chunk=Chunk,
|
chunk=Chunk,
|
||||||
csum=#mpb_chunkcsum{type=CSum_type, csum=CSum}}}) ->
|
csum=#mpb_chunkcsum{type=CSum_type, csum=CSum}}}}) ->
|
||||||
EpochID = conv_to_epoch_id(PB_EpochID),
|
EpochID = conv_to_epoch_id(PB_EpochID),
|
||||||
CSum_tag = conv_to_csum_tag(CSum_type),
|
CSum_tag = conv_to_csum_tag(CSum_type),
|
||||||
{ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, CSum}};
|
{ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, CSum}};
|
||||||
|
@ -73,14 +76,15 @@ from_pb_request(#mpb_ll_request{
|
||||||
req_id=ReqID,
|
req_id=ReqID,
|
||||||
read_chunk=#mpb_ll_readchunkreq{
|
read_chunk=#mpb_ll_readchunkreq{
|
||||||
epoch_id=PB_EpochID,
|
epoch_id=PB_EpochID,
|
||||||
file=File,
|
chunk_pos=ChunkPos,
|
||||||
offset=Offset,
|
|
||||||
size=Size,
|
|
||||||
flag_no_checksum=PB_GetNoChecksum,
|
flag_no_checksum=PB_GetNoChecksum,
|
||||||
flag_no_chunk=PB_GetNoChunk}}) ->
|
flag_no_chunk=PB_GetNoChunk}}) ->
|
||||||
EpochID = conv_to_epoch_id(PB_EpochID),
|
EpochID = conv_to_epoch_id(PB_EpochID),
|
||||||
Opts = [{no_checksum, conv_to_boolean(PB_GetNoChecksum)},
|
Opts = [{no_checksum, conv_to_boolean(PB_GetNoChecksum)},
|
||||||
{no_chunk, conv_to_boolean(PB_GetNoChunk)}],
|
{no_chunk, conv_to_boolean(PB_GetNoChunk)}],
|
||||||
|
#mpb_chunkpos{file_name=File,
|
||||||
|
offset=Offset,
|
||||||
|
chunk_size=Size} = ChunkPos,
|
||||||
{ReqID, {low_read_chunk, EpochID, File, Offset, Size, Opts}};
|
{ReqID, {low_read_chunk, EpochID, File, Offset, Size, Opts}};
|
||||||
from_pb_request(#mpb_ll_request{
|
from_pb_request(#mpb_ll_request{
|
||||||
req_id=ReqID,
|
req_id=ReqID,
|
||||||
|
@ -163,23 +167,23 @@ from_pb_request(#mpb_request{req_id=ReqID,
|
||||||
ChunkExtra}};
|
ChunkExtra}};
|
||||||
from_pb_request(#mpb_request{req_id=ReqID,
|
from_pb_request(#mpb_request{req_id=ReqID,
|
||||||
write_chunk=IR=#mpb_writechunkreq{}}) ->
|
write_chunk=IR=#mpb_writechunkreq{}}) ->
|
||||||
#mpb_writechunkreq{file=File,
|
#mpb_writechunkreq{chunk=#mpb_chunk{file_name=File,
|
||||||
offset=Offset,
|
offset=Offset,
|
||||||
chunk=Chunk,
|
chunk=Chunk,
|
||||||
csum=CSum} = IR,
|
csum=CSum}} = IR,
|
||||||
TaggedCSum = make_tagged_csum(CSum, Chunk),
|
TaggedCSum = make_tagged_csum(CSum, Chunk),
|
||||||
{ReqID, {high_write_chunk, File, Offset, Chunk, TaggedCSum}};
|
{ReqID, {high_write_chunk, File, Offset, Chunk, TaggedCSum}};
|
||||||
from_pb_request(#mpb_request{req_id=ReqID,
|
from_pb_request(#mpb_request{req_id=ReqID,
|
||||||
read_chunk=IR=#mpb_readchunkreq{}}) ->
|
read_chunk=IR=#mpb_readchunkreq{}}) ->
|
||||||
#mpb_readchunkreq{file=File,
|
#mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
|
||||||
offset=Offset,
|
offset=Offset,
|
||||||
size=Size} = IR,
|
chunk_size=Size}} = IR,
|
||||||
{ReqID, {high_read_chunk, File, Offset, Size}};
|
{ReqID, {high_read_chunk, File, Offset, Size}};
|
||||||
from_pb_request(#mpb_request{req_id=ReqID,
|
from_pb_request(#mpb_request{req_id=ReqID,
|
||||||
trim_chunk=IR=#mpb_trimchunkreq{}}) ->
|
trim_chunk=IR=#mpb_trimchunkreq{}}) ->
|
||||||
#mpb_trimchunkreq{file=File,
|
#mpb_trimchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
|
||||||
offset=Offset,
|
offset=Offset,
|
||||||
size=Size} = IR,
|
chunk_size=Size}} = IR,
|
||||||
{ReqID, {high_trim_chunk, File, Offset, Size}};
|
{ReqID, {high_trim_chunk, File, Offset, Size}};
|
||||||
from_pb_request(#mpb_request{req_id=ReqID,
|
from_pb_request(#mpb_request{req_id=ReqID,
|
||||||
checksum_list=IR=#mpb_checksumlistreq{}}) ->
|
checksum_list=IR=#mpb_checksumlistreq{}}) ->
|
||||||
|
@ -229,10 +233,17 @@ from_pb_response(#mpb_ll_response{
|
||||||
from_pb_response(#mpb_ll_response{
|
from_pb_response(#mpb_ll_response{
|
||||||
req_id=ReqID,
|
req_id=ReqID,
|
||||||
read_chunk=#mpb_ll_readchunkresp{status=Status,
|
read_chunk=#mpb_ll_readchunkresp{status=Status,
|
||||||
chunk=Chunk}}) ->
|
chunks=PB_Chunks}}) ->
|
||||||
case Status of
|
case Status of
|
||||||
'OK' ->
|
'OK' ->
|
||||||
{ReqID, {ok, Chunk}};
|
Chunks = lists:map(fun(#mpb_chunk{file_name=File,
|
||||||
|
offset=Offset,
|
||||||
|
chunk=Bytes,
|
||||||
|
csum=#mpb_chunkcsum{type=T,csum=Ck}}) ->
|
||||||
|
Csum = <<(conv_to_csum_tag(T)):8, Ck/binary>>,
|
||||||
|
{File, Offset, Bytes, Csum}
|
||||||
|
end, PB_Chunks),
|
||||||
|
{ReqID, {ok, Chunks}};
|
||||||
_ ->
|
_ ->
|
||||||
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
|
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
|
||||||
end;
|
end;
|
||||||
|
@ -367,10 +378,10 @@ to_pb_request(ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, C
|
||||||
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
|
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
|
||||||
write_chunk=#mpb_ll_writechunkreq{
|
write_chunk=#mpb_ll_writechunkreq{
|
||||||
epoch_id=PB_EpochID,
|
epoch_id=PB_EpochID,
|
||||||
file=File,
|
chunk=#mpb_chunk{file_name=File,
|
||||||
offset=Offset,
|
offset=Offset,
|
||||||
chunk=Chunk,
|
chunk=Chunk,
|
||||||
csum=PB_CSum}};
|
csum=PB_CSum}}};
|
||||||
to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, _Opts}) ->
|
to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, _Opts}) ->
|
||||||
%% TODO: stop ignoring Opts ^_^
|
%% TODO: stop ignoring Opts ^_^
|
||||||
PB_EpochID = conv_from_epoch_id(EpochID),
|
PB_EpochID = conv_from_epoch_id(EpochID),
|
||||||
|
@ -378,9 +389,10 @@ to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, _Opts}) ->
|
||||||
req_id=ReqID, do_not_alter=2,
|
req_id=ReqID, do_not_alter=2,
|
||||||
read_chunk=#mpb_ll_readchunkreq{
|
read_chunk=#mpb_ll_readchunkreq{
|
||||||
epoch_id=PB_EpochID,
|
epoch_id=PB_EpochID,
|
||||||
file=File,
|
chunk_pos=#mpb_chunkpos{
|
||||||
offset=Offset,
|
file_name=File,
|
||||||
size=Size}};
|
offset=Offset,
|
||||||
|
chunk_size=Size}}};
|
||||||
to_pb_request(ReqID, {low_checksum_list, EpochID, File}) ->
|
to_pb_request(ReqID, {low_checksum_list, EpochID, File}) ->
|
||||||
PB_EpochID = conv_from_epoch_id(EpochID),
|
PB_EpochID = conv_from_epoch_id(EpochID),
|
||||||
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
|
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
|
||||||
|
@ -466,12 +478,18 @@ to_pb_response(ReqID, {low_write_chunk, _EID, _Fl, _Off, _Ch, _CST, _CS},Resp)->
|
||||||
write_chunk=#mpb_ll_writechunkresp{status=Status}};
|
write_chunk=#mpb_ll_writechunkresp{status=Status}};
|
||||||
to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)->
|
to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)->
|
||||||
case Resp of
|
case Resp of
|
||||||
{ok, Chunk} ->
|
{ok, Chunks} ->
|
||||||
CSum = undefined, % TODO not implemented
|
PB_Chunks = lists:map(fun({File, Offset, Bytes, Csum}) ->
|
||||||
|
{Tag, Ck} = machi_util:unmake_tagged_csum(Csum),
|
||||||
|
#mpb_chunk{file_name=File,
|
||||||
|
offset=Offset,
|
||||||
|
chunk=Bytes,
|
||||||
|
csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag),
|
||||||
|
csum=Ck}}
|
||||||
|
end, Chunks),
|
||||||
#mpb_ll_response{req_id=ReqID,
|
#mpb_ll_response{req_id=ReqID,
|
||||||
read_chunk=#mpb_ll_readchunkresp{status='OK',
|
read_chunk=#mpb_ll_readchunkresp{status='OK',
|
||||||
chunk=Chunk,
|
chunks=PB_Chunks}};
|
||||||
csum=CSum}};
|
|
||||||
{error, _}=Error ->
|
{error, _}=Error ->
|
||||||
Status = conv_from_status(Error),
|
Status = conv_from_status(Error),
|
||||||
#mpb_ll_response{req_id=ReqID,
|
#mpb_ll_response{req_id=ReqID,
|
||||||
|
@ -638,10 +656,19 @@ to_pb_response(ReqID, {high_write_chunk, _File, _Offset, _Chunk, _TaggedCSum}, R
|
||||||
end;
|
end;
|
||||||
to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) ->
|
to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) ->
|
||||||
case Resp of
|
case Resp of
|
||||||
{ok, Chunk} ->
|
{ok, Chunks} ->
|
||||||
|
MpbChunks = lists:map(fun({File, Offset, Bytes, Csum}) ->
|
||||||
|
{Tag, Ck} = machi_util:unmake_tagged_csum(Csum),
|
||||||
|
io:format(user, "~p oom~n", [Csum]),
|
||||||
|
#mpb_chunk{
|
||||||
|
offset=Offset,
|
||||||
|
file_name=File,
|
||||||
|
chunk=Bytes,
|
||||||
|
csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag), csum=Ck}}
|
||||||
|
end, Chunks),
|
||||||
#mpb_response{req_id=ReqID,
|
#mpb_response{req_id=ReqID,
|
||||||
read_chunk=#mpb_readchunkresp{status='OK',
|
read_chunk=#mpb_readchunkresp{status='OK',
|
||||||
chunk=Chunk}};
|
chunks=MpbChunks}};
|
||||||
{error, _}=Error ->
|
{error, _}=Error ->
|
||||||
Status = conv_from_status(Error),
|
Status = conv_from_status(Error),
|
||||||
#mpb_response{req_id=ReqID,
|
#mpb_response{req_id=ReqID,
|
||||||
|
|
|
@ -113,12 +113,13 @@ smoke_test2() ->
|
||||||
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1},
|
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1},
|
||||||
{error, bad_checksum} =
|
{error, bad_checksum} =
|
||||||
machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs),
|
machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs),
|
||||||
{ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
|
{ok, [{_, Off1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
|
||||||
{ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0,
|
{ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0,
|
||||||
private),
|
private),
|
||||||
%% Verify that the client's CR wrote to all of them.
|
%% Verify that the client's CR wrote to all of them.
|
||||||
[{ok, Chunk1} = machi_flu1_client:read_chunk(
|
[{ok, [{_, Off1, Chunk1, _}]} =
|
||||||
Host, PortBase+X, EpochID, File1, Off1, Size1) ||
|
machi_flu1_client:read_chunk(
|
||||||
|
Host, PortBase+X, EpochID, File1, Off1, Size1) ||
|
||||||
X <- [0,1,2] ],
|
X <- [0,1,2] ],
|
||||||
|
|
||||||
%% Test read repair: Manually write to head, then verify that
|
%% Test read repair: Manually write to head, then verify that
|
||||||
|
@ -129,8 +130,8 @@ smoke_test2() ->
|
||||||
File1, FooOff1, Size1) || X <- [0,1,2] ],
|
File1, FooOff1, Size1) || X <- [0,1,2] ],
|
||||||
ok = machi_flu1_client:write_chunk(Host, PortBase+0, EpochID,
|
ok = machi_flu1_client:write_chunk(Host, PortBase+0, EpochID,
|
||||||
File1, FooOff1, Chunk1),
|
File1, FooOff1, Chunk1),
|
||||||
{ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, FooOff1, Size1),
|
{ok, [{_, FooOff1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, FooOff1, Size1),
|
||||||
[{X,{ok, Chunk1}} = {X,machi_flu1_client:read_chunk(
|
[{X,{ok, [{_, FooOff1, Chunk1, _}]}} = {X,machi_flu1_client:read_chunk(
|
||||||
Host, PortBase+X, EpochID,
|
Host, PortBase+X, EpochID,
|
||||||
File1, FooOff1, Size1)} || X <- [0,1,2] ],
|
File1, FooOff1, Size1)} || X <- [0,1,2] ],
|
||||||
|
|
||||||
|
@ -140,8 +141,8 @@ smoke_test2() ->
|
||||||
Size2 = size(Chunk2),
|
Size2 = size(Chunk2),
|
||||||
ok = machi_flu1_client:write_chunk(Host, PortBase+1, EpochID,
|
ok = machi_flu1_client:write_chunk(Host, PortBase+1, EpochID,
|
||||||
File1, FooOff2, Chunk2),
|
File1, FooOff2, Chunk2),
|
||||||
{ok, Chunk2} = machi_cr_client:read_chunk(C1, File1, FooOff2, Size2),
|
{ok, [{_, FooOff2, Chunk2, _}]} = machi_cr_client:read_chunk(C1, File1, FooOff2, Size2),
|
||||||
[{X,{ok, Chunk2}} = {X,machi_flu1_client:read_chunk(
|
[{X,{ok, [{_, FooOff2, Chunk2, _}]}} = {X,machi_flu1_client:read_chunk(
|
||||||
Host, PortBase+X, EpochID,
|
Host, PortBase+X, EpochID,
|
||||||
File1, FooOff2, Size2)} || X <- [0,1,2] ],
|
File1, FooOff2, Size2)} || X <- [0,1,2] ],
|
||||||
|
|
||||||
|
@ -165,7 +166,8 @@ smoke_test2() ->
|
||||||
{ok, {Off10,Size10,File10}} =
|
{ok, {Off10,Size10,File10}} =
|
||||||
machi_cr_client:append_chunk_extra(C1, Prefix, Chunk10,
|
machi_cr_client:append_chunk_extra(C1, Prefix, Chunk10,
|
||||||
Extra10 * Size10),
|
Extra10 * Size10),
|
||||||
{ok, Chunk10} = machi_cr_client:read_chunk(C1, File10, Off10, Size10),
|
{ok, [{_, Off10, Chunk10, _}]} =
|
||||||
|
machi_cr_client:read_chunk(C1, File10, Off10, Size10),
|
||||||
[begin
|
[begin
|
||||||
Offx = Off10 + (Seq * Size10),
|
Offx = Off10 + (Seq * Size10),
|
||||||
%% TODO: uncomment written/not_written enforcement is available.
|
%% TODO: uncomment written/not_written enforcement is available.
|
||||||
|
@ -173,8 +175,8 @@ smoke_test2() ->
|
||||||
%% Offx, Size10),
|
%% Offx, Size10),
|
||||||
{ok, {Offx,Size10,File10}} =
|
{ok, {Offx,Size10,File10}} =
|
||||||
machi_cr_client:write_chunk(C1, File10, Offx, Chunk10),
|
machi_cr_client:write_chunk(C1, File10, Offx, Chunk10),
|
||||||
{ok, Chunk10} = machi_cr_client:read_chunk(C1, File10, Offx,
|
{ok, [{_, Offx, Chunk10, _}]} =
|
||||||
Size10)
|
machi_cr_client:read_chunk(C1, File10, Offx, Size10)
|
||||||
end || Seq <- lists:seq(1, Extra10)],
|
end || Seq <- lists:seq(1, Extra10)],
|
||||||
{ok, {Off11,Size11,File11}} =
|
{ok, {Off11,Size11,File11}} =
|
||||||
machi_cr_client:append_chunk(C1, Prefix, Chunk10),
|
machi_cr_client:append_chunk(C1, Prefix, Chunk10),
|
||||||
|
@ -196,7 +198,8 @@ witness_smoke_test2() ->
|
||||||
{error, {already_started, P1}} -> P1;
|
{error, {already_started, P1}} -> P1;
|
||||||
Other -> error(Other)
|
Other -> error(Other)
|
||||||
end,
|
end,
|
||||||
error_logger:tty(false),
|
%% TODO: I wonder why commenting this out makes this test pass
|
||||||
|
%% error_logger:tty(true),
|
||||||
try
|
try
|
||||||
Prefix = <<"pre">>,
|
Prefix = <<"pre">>,
|
||||||
Chunk1 = <<"yochunk">>,
|
Chunk1 = <<"yochunk">>,
|
||||||
|
@ -215,7 +218,7 @@ witness_smoke_test2() ->
|
||||||
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1},
|
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1},
|
||||||
{error, bad_checksum} =
|
{error, bad_checksum} =
|
||||||
machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs),
|
machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs),
|
||||||
{ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
|
{ok, [{_, Off1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
|
||||||
|
|
||||||
%% Stop 'b' and let the chain reset.
|
%% Stop 'b' and let the chain reset.
|
||||||
ok = machi_flu_psup:stop_flu_package(b),
|
ok = machi_flu_psup:stop_flu_package(b),
|
||||||
|
@ -241,7 +244,8 @@ witness_smoke_test2() ->
|
||||||
end,
|
end,
|
||||||
|
|
||||||
%% Chunk1 is still readable: not affected by wedged witness head.
|
%% Chunk1 is still readable: not affected by wedged witness head.
|
||||||
{ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
|
{ok, [{_, Off1, Chunk1, _}]} =
|
||||||
|
machi_cr_client:read_chunk(C1, File1, Off1, Size1),
|
||||||
%% But because the head is wedged, an append will fail.
|
%% But because the head is wedged, an append will fail.
|
||||||
{error, partition} =
|
{error, partition} =
|
||||||
machi_cr_client:append_chunk(C1, Prefix, Chunk1, 1*1000),
|
machi_cr_client:append_chunk(C1, Prefix, Chunk1, 1*1000),
|
||||||
|
|
|
@ -201,7 +201,12 @@ read_post(S, [_Pid, Off, L], Res) ->
|
||||||
read_next(S, _Res, _Args) -> S.
|
read_next(S, _Res, _Args) -> S.
|
||||||
|
|
||||||
read(Pid, Offset, Length) ->
|
read(Pid, Offset, Length) ->
|
||||||
machi_file_proxy:read(Pid, Offset, Length).
|
case machi_file_proxy:read(Pid, Offset, Length) of
|
||||||
|
{ok, [{_, Offset, Data, Csum}]} ->
|
||||||
|
{ok, Data, Csum};
|
||||||
|
E ->
|
||||||
|
E
|
||||||
|
end.
|
||||||
|
|
||||||
%% write
|
%% write
|
||||||
|
|
||||||
|
|
|
@ -92,7 +92,7 @@ machi_file_proxy_test_() ->
|
||||||
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
|
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
|
||||||
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)),
|
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)),
|
||||||
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)),
|
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)),
|
||||||
?_assertMatch({ok, _, _}, machi_file_proxy:read(Pid, 1025, 1000)),
|
?_assertMatch({ok, [{_, _, _, _}]}, machi_file_proxy:read(Pid, 1025, 1000)),
|
||||||
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)),
|
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)),
|
||||||
?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))),
|
?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))),
|
||||||
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))
|
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))
|
||||||
|
|
|
@ -97,8 +97,8 @@ flu_smoke_test() ->
|
||||||
{ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort,
|
{ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort,
|
||||||
?DUMMY_PV1_EPOCH,
|
?DUMMY_PV1_EPOCH,
|
||||||
Prefix, Chunk1),
|
Prefix, Chunk1),
|
||||||
{ok, Chunk1} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
|
{ok, [{_, Off1, Chunk1, _}]} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
|
||||||
File1, Off1, Len1),
|
File1, Off1, Len1),
|
||||||
{ok, KludgeBin} = ?FLU_C:checksum_list(Host, TcpPort,
|
{ok, KludgeBin} = ?FLU_C:checksum_list(Host, TcpPort,
|
||||||
?DUMMY_PV1_EPOCH, File1),
|
?DUMMY_PV1_EPOCH, File1),
|
||||||
true = is_binary(KludgeBin),
|
true = is_binary(KludgeBin),
|
||||||
|
@ -151,7 +151,7 @@ flu_smoke_test() ->
|
||||||
File2, Off2, Chunk2),
|
File2, Off2, Chunk2),
|
||||||
{error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
|
{error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
|
||||||
BadFile, Off2, Chunk2),
|
BadFile, Off2, Chunk2),
|
||||||
{ok, Chunk2} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
|
{ok, [{_, Off2, Chunk2, _}]} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
|
||||||
File2, Off2, Len2),
|
File2, Off2, Len2),
|
||||||
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,
|
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,
|
||||||
?DUMMY_PV1_EPOCH,
|
?DUMMY_PV1_EPOCH,
|
||||||
|
|
|
@ -80,7 +80,9 @@ smoke_test2() ->
|
||||||
{iolist_to_binary(Chunk2), File2, Off2, Size2},
|
{iolist_to_binary(Chunk2), File2, Off2, Size2},
|
||||||
{iolist_to_binary(Chunk3), File3, Off3, Size3}],
|
{iolist_to_binary(Chunk3), File3, Off3, Size3}],
|
||||||
[begin
|
[begin
|
||||||
{ok, Ch} = ?C:read_chunk(Clnt, Fl, Off, Sz)
|
File = binary_to_list(Fl),
|
||||||
|
?assertMatch({ok, [{File, Off, Ch, _}]},
|
||||||
|
?C:read_chunk(Clnt, Fl, Off, Sz))
|
||||||
end || {Ch, Fl, Off, Sz} <- Reads],
|
end || {Ch, Fl, Off, Sz} <- Reads],
|
||||||
|
|
||||||
{ok, KludgeBin} = ?C:checksum_list(Clnt, File1),
|
{ok, KludgeBin} = ?C:checksum_list(Clnt, File1),
|
||||||
|
|
|
@ -60,12 +60,14 @@ api_smoke_test() ->
|
||||||
{ok, {MyOff,MySize,MyFile}} =
|
{ok, {MyOff,MySize,MyFile}} =
|
||||||
?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk,
|
?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk,
|
||||||
infinity),
|
infinity),
|
||||||
{ok, MyChunk} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize),
|
{ok, [{_, MyOff, MyChunk, _}]} =
|
||||||
|
?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize),
|
||||||
MyChunk2 = <<"my chunk data, yeah, again">>,
|
MyChunk2 = <<"my chunk data, yeah, again">>,
|
||||||
{ok, {MyOff2,MySize2,MyFile2}} =
|
{ok, {MyOff2,MySize2,MyFile2}} =
|
||||||
?MUT:append_chunk_extra(Prox1, FakeEpoch, Prefix,
|
?MUT:append_chunk_extra(Prox1, FakeEpoch, Prefix,
|
||||||
MyChunk2, 4242, infinity),
|
MyChunk2, 4242, infinity),
|
||||||
{ok, MyChunk2} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2),
|
{ok, [{_, MyOff2, MyChunk2, _}]} =
|
||||||
|
?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2),
|
||||||
MyChunk_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, MyChunk},
|
MyChunk_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, MyChunk},
|
||||||
{error, bad_checksum} = ?MUT:append_chunk(Prox1, FakeEpoch,
|
{error, bad_checksum} = ?MUT:append_chunk(Prox1, FakeEpoch,
|
||||||
Prefix, MyChunk_badcs),
|
Prefix, MyChunk_badcs),
|
||||||
|
@ -240,7 +242,7 @@ flu_restart_test() ->
|
||||||
(stop) -> ?MUT:append_chunk_extra(Prox1, FakeEpoch,
|
(stop) -> ?MUT:append_chunk_extra(Prox1, FakeEpoch,
|
||||||
<<"prefix">>, Data, 42, infinity)
|
<<"prefix">>, Data, 42, infinity)
|
||||||
end,
|
end,
|
||||||
fun(run) -> {ok, Data} =
|
fun(run) -> {ok, [{_, Off1, Data, _}]} =
|
||||||
?MUT:read_chunk(Prox1, FakeEpoch,
|
?MUT:read_chunk(Prox1, FakeEpoch,
|
||||||
File1, Off1, Size1),
|
File1, Off1, Size1),
|
||||||
ok;
|
ok;
|
||||||
|
|
Loading…
Reference in a new issue