Do the slicing in flu server rather than in CR client
This commit is contained in:
parent
c5661571e3
commit
3d6d4d8be3
3 changed files with 29 additions and 39 deletions
|
@ -112,7 +112,6 @@
|
|||
|
||||
-ifdef(TEST).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-export([trim_both_side/3]).
|
||||
-endif. % TEST.
|
||||
|
||||
-export([start_link/1]).
|
||||
|
@ -529,8 +528,7 @@ do_read_chunk2(File, Offset, Size, Opts, Depth, STime, TO,
|
|||
ConsistencyMode = P#projection_v1.mode,
|
||||
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
|
||||
File, Offset, Size, Opts, ?TIMEOUT) of
|
||||
{ok, {Chunks0, []}} when is_list(Chunks0) ->
|
||||
Chunks = trim_both_side(Chunks0, Offset, Offset + Size),
|
||||
{ok, {Chunks, []}} when is_list(Chunks) ->
|
||||
{reply, {ok, {Chunks, []}}, S};
|
||||
%% {ok, BadChunk} ->
|
||||
%% %% TODO cleaner handling of bad chunks
|
||||
|
@ -926,28 +924,3 @@ timeout(infinity) ->
|
|||
timeout(15*60*1000); % close enough to infinity
|
||||
timeout(Timeout0) ->
|
||||
{Timeout0, Timeout0 + 30*1000}.
|
||||
|
||||
%% @doc Trim both right and left border of chunks to fit in to given
|
||||
%% range [LeftPos, RightPos]. TODO: write unit tests for this function.
|
||||
trim_both_side([], _, _) -> [];
|
||||
trim_both_side([{F, Offset, Chunk, _Csum}|L], LeftPos, RightPos)
|
||||
when Offset < LeftPos andalso LeftPos < RightPos ->
|
||||
TrashLen = 8 * (LeftPos - Offset),
|
||||
<<_:TrashLen/binary, NewChunk/binary>> = Chunk,
|
||||
%% CR Client is client or server?
|
||||
NewChecksum = machi_util:make_tagged_csum(client_sha, Chunk),
|
||||
NewH = {F, LeftPos, NewChunk, NewChecksum},
|
||||
trim_both_side([NewH|L], LeftPos, RightPos);
|
||||
trim_both_side(Chunks, LeftPos, RightPos) when LeftPos =< RightPos ->
|
||||
%% TODO: optimize
|
||||
[{F, Offset, Chunk, _Csum}|L] = lists:reverse(Chunks),
|
||||
Size = iolist_size(Chunk),
|
||||
if RightPos < Offset + Size ->
|
||||
NewSize = RightPos - Offset,
|
||||
<<NewChunk:NewSize/binary, _/binary>> = Chunk,
|
||||
%% CR Client is client or server?
|
||||
NewChecksum = machi_util:make_tagged_csum(client_sha, Chunk),
|
||||
lists:reverse([{F, Offset, NewChunk, NewChecksum}|L]);
|
||||
true ->
|
||||
Chunks
|
||||
end.
|
||||
|
|
|
@ -281,9 +281,7 @@ handle_call({read, Offset, Length, Opts}, _From,
|
|||
State = #state{filename = F,
|
||||
data_filehandle = FH,
|
||||
csum_table = CsumTable,
|
||||
eof_position = EofP,
|
||||
reads = {T, Err}
|
||||
}) ->
|
||||
reads = {T, Err}}) ->
|
||||
NoChecksum = proplists:get_value(no_checksum, Opts, false),
|
||||
NoChunk = proplists:get_value(no_chunk, Opts, false),
|
||||
NeedsMerge = proplists:get_value(needs_trimmed, Opts, false),
|
||||
|
@ -291,12 +289,9 @@ handle_call({read, Offset, Length, Opts}, _From,
|
|||
case do_read(FH, F, CsumTable, Offset, Length, NoChecksum, NoChunk, NeedsMerge) of
|
||||
{ok, {[], []}} ->
|
||||
{{error, not_written}, Err + 1};
|
||||
{ok, {Chunks, Trimmed}} ->
|
||||
%% Kludge to wrap read result in tuples, to support fragmented read
|
||||
%% XXX FIXME
|
||||
%% For now we are omiting the checksum data because it blows up
|
||||
%% protobufs.
|
||||
{{ok, {Chunks, Trimmed}}, Err};
|
||||
{ok, {Chunks0, Trimmed0}} ->
|
||||
Chunks = slice_both_side(Chunks0, Offset, Offset+Length),
|
||||
{{ok, {Chunks, Trimmed0}}, Err};
|
||||
Error ->
|
||||
lager:error("Can't read ~p, ~p at File ~p", [Offset, Length, F]),
|
||||
{Error, Err + 1}
|
||||
|
@ -664,3 +659,26 @@ do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data) ->
|
|||
[Other, Filename, Offset, Size]),
|
||||
{error, Other}
|
||||
end.
|
||||
|
||||
%% @doc Trim both right and left border of chunks to fit in to given
|
||||
%% range [LeftPos, RightPos]. TODO: write unit tests for this function.
|
||||
slice_both_side([], _, _) -> [];
|
||||
slice_both_side([{F, Offset, Chunk, _Csum}|L], LeftPos, RightPos)
|
||||
when Offset < LeftPos andalso LeftPos < RightPos ->
|
||||
TrashLen = 8 * (LeftPos - Offset),
|
||||
<<_:TrashLen/binary, NewChunk/binary>> = Chunk,
|
||||
NewChecksum = machi_util:make_tagged_csum(client_sha, Chunk),
|
||||
NewH = {F, LeftPos, NewChunk, NewChecksum},
|
||||
slice_both_side([NewH|L], LeftPos, RightPos);
|
||||
slice_both_side(Chunks, LeftPos, RightPos) when LeftPos =< RightPos ->
|
||||
%% TODO: optimize
|
||||
[{F, Offset, Chunk, _Csum}|L] = lists:reverse(Chunks),
|
||||
Size = iolist_size(Chunk),
|
||||
if RightPos < Offset + Size ->
|
||||
NewSize = RightPos - Offset,
|
||||
<<NewChunk:NewSize/binary, _/binary>> = Chunk,
|
||||
NewChecksum = machi_util:make_tagged_csum(client_sha, Chunk),
|
||||
lists:reverse([{F, Offset, NewChunk, NewChecksum}|L]);
|
||||
true ->
|
||||
Chunks
|
||||
end.
|
||||
|
|
|
@ -203,8 +203,7 @@ read_next(S, _Res, _Args) -> S.
|
|||
read(Pid, Offset, Length) ->
|
||||
case machi_file_proxy:read(Pid, Offset, Length) of
|
||||
{ok, {Chunks, _}} ->
|
||||
[{_, Offset, Data, Csum}] =
|
||||
machi_cr_client:trim_both_side(Chunks, Offset, Offset+Length),
|
||||
[{_, Offset, Data, Csum}] = Chunks,
|
||||
{ok, Data, Csum};
|
||||
E ->
|
||||
E
|
||||
|
|
Loading…
Reference in a new issue