diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index ac7134b..09ef18d 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -119,6 +119,7 @@ %% File API append_chunk/3, append_chunk/4, append_chunk_extra/4, append_chunk_extra/5, + write_chunk/4, write_chunk/5, read_chunk/4, read_chunk/5, checksum_list/2, checksum_list/3, list_files/1, list_files/2, @@ -180,6 +181,19 @@ append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra, Timeout) -> Chunk, ChunkExtra}}, Timeout). +%% @doc Write a chunk of data (that has already been +%% allocated/sequenced by an earlier append_chunk_extra() call) to +%% `File' at `Offset'. + +write_chunk(PidSpec, File, Offset, Chunk) -> + write_chunk(PidSpec, File, Offset, Chunk, ?DEFAULT_TIMEOUT). + +%% @doc Read a chunk of data of size `Size' from `File' at `Offset'. + +write_chunk(PidSpec, File, Offset, Chunk, Timeout) -> + gen_server:call(PidSpec, {req, {write_chunk, File, Offset, Chunk}}, + Timeout). + %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. read_chunk(PidSpec, File, Offset, Size) -> @@ -252,6 +266,8 @@ code_change(_OldVsn, S, _Extra) -> handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra}, _From, S) -> do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), S); +handle_call2({write_chunk, File, Offset, Chunk}, _From, S) -> + do_write_head(File, Offset, Chunk, 0, os:timestamp(), S); handle_call2({read_chunk, File, Offset, Size}, _From, S) -> do_read_chunk(File, Offset, Size, 0, os:timestamp(), S); handle_call2({checksum_list, File}, _From, S) -> @@ -338,8 +354,13 @@ do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, %% over with a new sequencer assignment, at %% the 2nd have of the impl (we have already %% slept & refreshed the projection). - do_append_head2(Prefix, Chunk, ChunkExtra, Depth, - STime, S2); + + if Prefix == undefined -> % atom! not binary()!! + {error, partition}; + true -> + do_append_head2(Prefix, Chunk, ChunkExtra, + Depth, STime, S2) + end; RestFLUs3 -> do_append_midtail2(RestFLUs3, Prefix, File, Offset, Chunk, ChunkExtra, @@ -378,6 +399,51 @@ do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk, exit({todo_should_never_happen,?MODULE,?LINE,File,Offset}) end. +do_write_head(File, Offset, Chunk, 0=Depth, STime, S) -> + do_write_head2(File, Offset, Chunk, Depth + 1, STime, S); +do_write_head(File, Offset, Chunk, Depth, STime, #state{proj=P}=S) -> + %% io:format(user, "head sleep1,", []), + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > ?MAX_RUNTIME -> + {reply, {error, partition}, S}; + true -> + %% This is suboptimal for performance: there are some paths + %% through this point where our current projection is good + %% enough. But we're going to try to keep the code as simple + %% as we can for now. + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + case S2#state.proj of + P2 when P2 == undefined orelse + P2#projection_v1.upi == [] -> + do_write_head(File, Offset, Chunk, Depth + 1, STime, S2); + _ -> + do_write_head2(File, Offset, Chunk, Depth + 1, STime, S2) + end + end. + +do_write_head2(File, Offset, Chunk, Depth, STime, + #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> + [HeadFLU|RestFLUs] = mutation_flus(P), + Proxy = orddict:fetch(HeadFLU, PD), + case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of + ok -> + %% From this point onward, we use the same code & logic path as + %% append does. + do_append_midtail(RestFLUs, undefined, File, Offset, Chunk, + undefined, [HeadFLU], 0, STime, S); + {error, bad_checksum}=BadCS -> + {reply, BadCS, S}; + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + do_write_head(File, Offset, Chunk, Depth, STime, S); + {error, written}=Err -> + Err; + {error, not_written} -> + exit({todo_should_never_happen,?MODULE,?LINE, + iolist_size(Chunk)}) + end. + do_read_chunk(File, Offset, Size, 0=Depth, STime, #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty do_read_chunk2(File, Offset, Size, Depth + 1, STime, S); diff --git a/test/machi_cr_client_test.erl b/test/machi_cr_client_test.erl index 6466294..e6db150 100644 --- a/test/machi_cr_client_test.erl +++ b/test/machi_cr_client_test.erl @@ -135,6 +135,29 @@ smoke_test2() -> %% Exactly one file right now {ok, [_]} = machi_cr_client:list_files(C1), + %% Go back and test append_chunk_extra() and write_chunk() + Chunk10 = <<"It's a different chunk!">>, + Size10 = byte_size(Chunk10), + Extra10 = 5, + {ok, {Off10,Size10,File10}} = + machi_cr_client:append_chunk_extra(C1, Prefix, Chunk10, + Extra10 * Size10), + {ok, Chunk10} = machi_cr_client:read_chunk(C1, File10, Off10, Size10), + [begin + Offx = Off10 + (Seq * Size10), + %% TODO: uncomment written/not_written enforcement is available. + %% {error,not_written} = machi_cr_client:read_chunk(C1, File10, + %% Offx, Size10), + {ok, {Offx,Size10,File10}} = + machi_cr_client:write_chunk(C1, File10, Offx, Chunk10), + {ok, Chunk10} = machi_cr_client:read_chunk(C1, File10, Offx, + Size10) + end || Seq <- lists:seq(1, Extra10)], + {ok, {Off11,Size11,File11}} = + machi_cr_client:append_chunk(C1, Prefix, Chunk10), + %% Double-check that our reserved extra bytes were really honored! + true = (Off11 > (Off10 + (Extra10 * Size10))), + ok after error_logger:tty(true),