Add write_chunk() to machi_cr_client.erl

This commit is contained in:
Scott Lystig Fritchie 2015-06-19 14:49:09 +09:00
parent 40c0a72b48
commit 3c300bb9f1
2 changed files with 91 additions and 2 deletions

View file

@ -119,6 +119,7 @@
%% File API
append_chunk/3, append_chunk/4,
append_chunk_extra/4, append_chunk_extra/5,
write_chunk/4, write_chunk/5,
read_chunk/4, read_chunk/5,
checksum_list/2, checksum_list/3,
list_files/1, list_files/2,
@ -180,6 +181,19 @@ append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra, Timeout) ->
Chunk, ChunkExtra}},
Timeout).
%% @doc Write a chunk of data (that has already been
%% allocated/sequenced by an earlier append_chunk_extra() call) to
%% `File' at `Offset'.
write_chunk(PidSpec, File, Offset, Chunk) ->
write_chunk(PidSpec, File, Offset, Chunk, ?DEFAULT_TIMEOUT).
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
write_chunk(PidSpec, File, Offset, Chunk, Timeout) ->
gen_server:call(PidSpec, {req, {write_chunk, File, Offset, Chunk}},
Timeout).
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
read_chunk(PidSpec, File, Offset, Size) ->
@ -252,6 +266,8 @@ code_change(_OldVsn, S, _Extra) ->
handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra}, _From, S) ->
do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), S);
handle_call2({write_chunk, File, Offset, Chunk}, _From, S) ->
do_write_head(File, Offset, Chunk, 0, os:timestamp(), S);
handle_call2({read_chunk, File, Offset, Size}, _From, S) ->
do_read_chunk(File, Offset, Size, 0, os:timestamp(), S);
handle_call2({checksum_list, File}, _From, S) ->
@ -338,8 +354,13 @@ do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
%% over with a new sequencer assignment, at
%% the 2nd have of the impl (we have already
%% slept & refreshed the projection).
do_append_head2(Prefix, Chunk, ChunkExtra, Depth,
STime, S2);
if Prefix == undefined -> % atom! not binary()!!
{error, partition};
true ->
do_append_head2(Prefix, Chunk, ChunkExtra,
Depth, STime, S2)
end;
RestFLUs3 ->
do_append_midtail2(RestFLUs3, Prefix, File, Offset,
Chunk, ChunkExtra,
@ -378,6 +399,51 @@ do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk,
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset})
end.
do_write_head(File, Offset, Chunk, 0=Depth, STime, S) ->
do_write_head2(File, Offset, Chunk, Depth + 1, STime, S);
do_write_head(File, Offset, Chunk, Depth, STime, #state{proj=P}=S) ->
%% io:format(user, "head sleep1,", []),
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > ?MAX_RUNTIME ->
{reply, {error, partition}, S};
true ->
%% This is suboptimal for performance: there are some paths
%% through this point where our current projection is good
%% enough. But we're going to try to keep the code as simple
%% as we can for now.
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
case S2#state.proj of
P2 when P2 == undefined orelse
P2#projection_v1.upi == [] ->
do_write_head(File, Offset, Chunk, Depth + 1, STime, S2);
_ ->
do_write_head2(File, Offset, Chunk, Depth + 1, STime, S2)
end
end.
do_write_head2(File, Offset, Chunk, Depth, STime,
#state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) ->
[HeadFLU|RestFLUs] = mutation_flus(P),
Proxy = orddict:fetch(HeadFLU, PD),
case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of
ok ->
%% From this point onward, we use the same code & logic path as
%% append does.
do_append_midtail(RestFLUs, undefined, File, Offset, Chunk,
undefined, [HeadFLU], 0, STime, S);
{error, bad_checksum}=BadCS ->
{reply, BadCS, S};
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
do_write_head(File, Offset, Chunk, Depth, STime, S);
{error, written}=Err ->
Err;
{error, not_written} ->
exit({todo_should_never_happen,?MODULE,?LINE,
iolist_size(Chunk)})
end.
do_read_chunk(File, Offset, Size, 0=Depth, STime,
#state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty
do_read_chunk2(File, Offset, Size, Depth + 1, STime, S);

View file

@ -135,6 +135,29 @@ smoke_test2() ->
%% Exactly one file right now
{ok, [_]} = machi_cr_client:list_files(C1),
%% Go back and test append_chunk_extra() and write_chunk()
Chunk10 = <<"It's a different chunk!">>,
Size10 = byte_size(Chunk10),
Extra10 = 5,
{ok, {Off10,Size10,File10}} =
machi_cr_client:append_chunk_extra(C1, Prefix, Chunk10,
Extra10 * Size10),
{ok, Chunk10} = machi_cr_client:read_chunk(C1, File10, Off10, Size10),
[begin
Offx = Off10 + (Seq * Size10),
%% TODO: uncomment written/not_written enforcement is available.
%% {error,not_written} = machi_cr_client:read_chunk(C1, File10,
%% Offx, Size10),
{ok, {Offx,Size10,File10}} =
machi_cr_client:write_chunk(C1, File10, Offx, Chunk10),
{ok, Chunk10} = machi_cr_client:read_chunk(C1, File10, Offx,
Size10)
end || Seq <- lists:seq(1, Extra10)],
{ok, {Off11,Size11,File11}} =
machi_cr_client:append_chunk(C1, Prefix, Chunk10),
%% Double-check that our reserved extra bytes were really honored!
true = (Off11 > (Off10 + (Extra10 * Size10))),
ok
after
error_logger:tty(true),