diff --git a/src/machi_file_proxy.erl b/src/machi_file_proxy.erl index a7bad0f..ff6748f 100644 --- a/src/machi_file_proxy.erl +++ b/src/machi_file_proxy.erl @@ -131,9 +131,8 @@ sync(_Pid, Type) -> {error, bad_arg}. % @doc Read file at offset for length. This returns a sequence of all -% chunks that overlaps with requested offset and length. Note that -% borders are not aligned, not to mess up repair at cr_client with -% checksums. They should be cut at cr_client. +% written and trimmed (optional) bytes that overlaps with requested +% offset and length. Borders are not aligned. -spec read(Pid :: pid(), Offset :: non_neg_integer(), Length :: non_neg_integer()) -> @@ -223,6 +222,8 @@ init({FluName, Filename, DataDir}) -> UnwrittenBytes = machi_csum_table:calc_unwritten_bytes(CsumTable), {Eof, infinity} = lists:last(UnwrittenBytes), {ok, FHd} = file:open(DPath, [read, write, binary, raw]), + %% Reserve for EC and stuff, to prevent eof when read + ok = file:pwrite(FHd, 0, binary:copy(<<"so what?">>, ?MINIMUM_OFFSET div 8)), Tref = schedule_tick(), St = #state{ fluname = FluName, @@ -368,7 +369,7 @@ handle_call({trim, Offset, Size, _TriggerGC}, _From, trims = {T, Err}, csum_table = CsumTable}) -> - case machi_csum_table:all_trimmed(CsumTable, Offset, Size) of + case machi_csum_table:all_trimmed(CsumTable, Offset, Offset+Size) of true -> NewState = State#state{ops=Ops+1, trims={T, Err+1}}, %% All bytes of that range was already trimmed returns ok @@ -385,7 +386,10 @@ handle_call({trim, Offset, Size, _TriggerGC}, _From, case machi_csum_table:trim(CsumTable, Offset, Size, LUpdate, RUpdate) of ok -> - NewState = State#state{ops=Ops+1, trims={T+1, Err}}, + {NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable)), + NewState = State#state{ops=Ops+1, + trims={T+1, Err}, + eof_position=NewEof}, maybe_gc(ok, NewState); Error -> {reply, Error, State#state{ops=Ops+1, trims={T, Err+1}}} @@ -573,6 +577,15 @@ check_or_make_tagged_csum(Tag, InCsum, Data) when Tag == ?CSUM_TAG_CLIENT_SHA; false -> {error, {bad_csum, Csum}} end; +check_or_make_tagged_csum(?CSUM_TAG_SERVER_REGEN_SHA, + InCsum, Data) -> + Csum = machi_util:checksum_chunk(Data), + case Csum =:= InCsum of + true -> + machi_util:make_tagged_csum(server_regen_sha, Csum); + false -> + {error, {bad_csum, Csum}} + end; check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) -> lager:warning("Unknown checksum tag ~p", [OtherTag]), {error, bad_checksum}. @@ -669,6 +682,7 @@ read_all_ranges(FHd, Filename, [{Offset, Size, TaggedCsum}|T], ReadChunks, Trimm % caller as `ok' handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) -> Size = iolist_size(Data), + case machi_csum_table:find(CsumTable, Offset, Size) of [] -> %% Nothing should be there try @@ -762,7 +776,7 @@ slice_both_side([], _, _) -> []; slice_both_side([{F, Offset, Chunk, _Csum}|L], LeftPos, RightPos) when Offset < LeftPos andalso LeftPos < RightPos -> - TrashLen = 8 * (LeftPos - Offset), + TrashLen = (LeftPos - Offset), <<_:TrashLen/binary, NewChunk/binary>> = Chunk, NewChecksum = machi_util:make_tagged_csum(?CSUM_TAG_SERVER_REGEN_SHA_ATOM, Chunk), NewH = {F, LeftPos, NewChunk, NewChecksum}, @@ -787,7 +801,7 @@ maybe_regenerate_checksum(_, {_, _, trimmed} = Change) -> maybe_regenerate_checksum(FHd, {Offset, Size, _Csum}) -> case file:pread(FHd, Offset, Size) of eof -> - error(eof); + error({eof, Offset, Size}); {ok, Bytes} when byte_size(Bytes) =:= Size -> TaggedCsum = machi_util:make_tagged_csum(server_regen_sha, @@ -797,9 +811,9 @@ maybe_regenerate_checksum(FHd, {Offset, Size, _Csum}) -> throw(Error) end. -%% GC: make sure unwritte bytes = [{Eof, infinity}] and Eof is > max file size -%% walk through the checksum table and make sure all chunks trimmed -%% Then unlink the file +%% GC: make sure unwritten bytes = [{Eof, infinity}] and Eof is > max +%% file size walk through the checksum table and make sure all chunks +%% trimmed Then unlink the file -spec maybe_gc(term(), #state{}) -> {reply, term(), #state{}} | {stop, normal, term(), #state{}}. maybe_gc(Reply, S = #state{eof_position = Eof, diff --git a/src/machi_pb_high_client.erl b/src/machi_pb_high_client.erl index ec2dfc6..ef1e740 100644 --- a/src/machi_pb_high_client.erl +++ b/src/machi_pb_high_client.erl @@ -58,68 +58,110 @@ count=0 :: non_neg_integer() }). +%% @doc official error types that is specific in Machi +-type machi_client_error_reason() :: bad_arg | wedged | bad_checksum | + partition | not_written | written | + trimmed | no_such_file | partial_read | + bad_epoch | inet:posix(). + +%% @doc Creates a client process +-spec start_link(p_srvr_dict()) -> {ok, pid()} | {error, machi_client_error_reason()}. start_link(P_srvr_list) -> gen_server:start_link(?MODULE, [P_srvr_list], []). +%% @doc Stops a client process. +-spec quit(pid()) -> ok. quit(PidSpec) -> gen_server:call(PidSpec, quit, infinity). connected_p(PidSpec) -> gen_server:call(PidSpec, connected_p, infinity). +-spec echo(pid(), string()) -> {ok, string()} | {error, machi_client_error_reason()}. echo(PidSpec, String) -> echo(PidSpec, String, ?DEFAULT_TIMEOUT). +-spec echo(pid(), string(), non_neg_integer()) -> {ok, string()} | {error, machi_client_error_reason()}. echo(PidSpec, String, Timeout) -> send_sync(PidSpec, {echo, String}, Timeout). %% TODO: auth() is not implemented. Auth requires SSL, and this client %% doesn't support SSL yet. This is just a placeholder and reminder. +-spec auth(pid(), string(), string()) -> ok | {error, machi_client_error_reason()}. auth(PidSpec, User, Pass) -> auth(PidSpec, User, Pass, ?DEFAULT_TIMEOUT). +-spec auth(pid(), string(), string(), non_neg_integer()) -> ok | {error, machi_client_error_reason()}. auth(PidSpec, User, Pass, Timeout) -> send_sync(PidSpec, {auth, User, Pass}, Timeout). +-spec append_chunk(pid(), PlacementKey::binary(), Prefix::binary(), Chunk::binary(), + CSum::binary(), ChunkExtra::non_neg_integer()) -> + {ok, Filename::string(), Offset::machi_dt:file_offset()} | + {error, machi_client_error_reason()}. append_chunk(PidSpec, PlacementKey, Prefix, Chunk, CSum, ChunkExtra) -> append_chunk(PidSpec, PlacementKey, Prefix, Chunk, CSum, ChunkExtra, ?DEFAULT_TIMEOUT). +-spec append_chunk(pid(), PlacementKey::binary(), Prefix::binary(), + Chunk::binary(), CSum::binary(), + ChunkExtra::non_neg_integer(), + Timeout::non_neg_integer()) -> + {ok, Filename::string(), Offset::machi_dt:file_offset()} | + {error, machi_client_error_reason()}. append_chunk(PidSpec, PlacementKey, Prefix, Chunk, CSum, ChunkExtra, Timeout) -> send_sync(PidSpec, {append_chunk, PlacementKey, Prefix, Chunk, CSum, ChunkExtra}, Timeout). +-spec write_chunk(pid(), File::string(), machi_dt:file_offset(), + Chunk::binary(), CSum::binary()) -> + ok | {error, machi_client_error_reason()}. write_chunk(PidSpec, File, Offset, Chunk, CSum) -> write_chunk(PidSpec, File, Offset, Chunk, CSum, ?DEFAULT_TIMEOUT). +-spec write_chunk(pid(), File::string(), machi_dt:file_offset(), + Chunk::binary(), CSum::binary(), Timeout::non_neg_integer()) -> + ok | {error, machi_client_error_reason()}. write_chunk(PidSpec, File, Offset, Chunk, CSum, Timeout) -> send_sync(PidSpec, {write_chunk, File, Offset, Chunk, CSum}, Timeout). %% @doc Tries to read a chunk of a specified file. It returns `{ok, %% {Chunks, TrimmedChunks}}' for live file while it returns `{error, %% trimmed}' if all bytes of the file was trimmed. --spec read_chunk(pid(), string(), pos_integer(), pos_integer(), +-spec read_chunk(pid(), File::string(), machi_dt:file_offset(), machi_dt:chunk_size(), [{flag_no_checksum | flag_no_chunk | needs_trimmed, boolean()}]) -> - {ok, {list(), list()}} | {error, term()}. + {ok, {Chunks::[{File::string(), machi_dt:file_offset(), machi_dt:chunk_size(), binary()}], + Trimmed::[{File::string(), machi_dt:file_offset(), machi_dt:chunk_size()}]}} | + {error, machi_client_error_reason()}. read_chunk(PidSpec, File, Offset, Size, Options) -> read_chunk(PidSpec, File, Offset, Size, Options, ?DEFAULT_TIMEOUT). --spec read_chunk(pid(), string(), pos_integer(), pos_integer(), - [{no_checksum | no_chunk | needs_trimmed, boolean()}], - pos_integer()) -> - {ok, {list(), list()}} | {error, term()}. +-spec read_chunk(pid(), File::string(), machi_dt:file_offset(), machi_dt:chunk_size(), + [{flag_no_checksum | flag_no_chunk | needs_trimmed, boolean()}], + Timeout::non_neg_integer()) -> + {ok, {Chunks::[{File::string(), machi_dt:file_offset(), machi_dt:chunk_size(), binary()}], + Trimmed::[{File::string(), machi_dt:file_offset(), machi_dt:chunk_size()}]}} | + {error, machi_client_error_reason()}. read_chunk(PidSpec, File, Offset, Size, Options, Timeout) -> send_sync(PidSpec, {read_chunk, File, Offset, Size, Options}, Timeout). -%% @doc Trims arbitrary binary range of any file. TODO: Add option -%% specifying whether to trigger GC. +%% @doc Trims arbitrary binary range of any file. If a specified range +%% has any byte trimmed, it fails and returns `{error, trimmed}`. +%% Otherwise it trims all bytes in that range. If there are +%% overlapping chunks with client-specified checksum, they will cut +%% off and checksum are re-calculated in server side. TODO: Add +%% option specifying whether to trigger GC. -spec trim_chunk(pid(), string(), non_neg_integer(), machi_dt:chunk_size()) -> - ok | {error, term()}. + ok | {error, machi_client_error_reason()}. trim_chunk(PidSpec, File, Offset, Size) -> trim_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT). trim_chunk(PidSpec, File, Offset, Size, Timeout) -> send_sync(PidSpec, {trim_chunk, File, Offset, Size}, Timeout). +%% @doc Returns a binary that has checksums and chunks encoded inside +%% (This is because encoding-decoding them are inefficient). TODO: +%% return a structured list of them. +-spec checksum_list(pid(), string()) -> {ok, binary()} | {error, machi_client_error_reason()}. checksum_list(PidSpec, File) -> checksum_list(PidSpec, File, ?DEFAULT_TIMEOUT). diff --git a/test/machi_csum_table_test.erl b/test/machi_csum_table_test.erl index 683d512..c168d45 100644 --- a/test/machi_csum_table_test.erl +++ b/test/machi_csum_table_test.erl @@ -54,7 +54,6 @@ smoke2_test() -> ok = machi_csum_table:close(MC), ok = machi_csum_table:delete(MC). - smoke3_test() -> Filename = "./temp-checksum-dumb-file-4", _ = file:delete(Filename), @@ -107,5 +106,4 @@ smoke3_test() -> ok = machi_csum_table:close(MC), ok = machi_csum_table:delete(MC). - %% TODO: add quickcheck test here diff --git a/test/machi_file_proxy_eqc.erl b/test/machi_file_proxy_eqc.erl index e2eb955..c0625e3 100644 --- a/test/machi_file_proxy_eqc.erl +++ b/test/machi_file_proxy_eqc.erl @@ -37,7 +37,7 @@ eqc_test_() -> {timeout, 60, {spawn, [ - {timeout, 30, ?_assertEqual(true, eqc:quickcheck(eqc:testing_time(15, ?QC_OUT(prop_ok()))))} + {timeout, 30, ?_assertEqual(true, eqc:quickcheck(eqc:testing_time(15, ?QC_OUT(prop_ok()))))} ] }}. @@ -88,12 +88,14 @@ data_with_csum(Limit) -> intervals([]) -> []; intervals([N]) -> - [{N, choose(1,150)}]; + [{N, choose(1,1)}]; intervals([A,B|T]) -> - [{A, choose(1, B-A)}|intervals([B|T])]. + [{A, oneof([choose(1, B-A), B-A])}|intervals([B|T])]. interval_list() -> - ?LET(L, list(choose(1024, 4096)), intervals(lists:usort(L))). + ?LET(L, + oneof([list(choose(1025, 1033)), list(choose(1024, 4096))]), + intervals(lists:usort(L))). shuffle_interval() -> ?LET(L, interval_list(), shuffle(L)). @@ -103,40 +105,98 @@ get_written_interval(L) -> %% INITIALIZATION --record(state, {pid, prev_extra = 0, planned_writes=[], written=[]}). +-record(state, {pid, prev_extra = 0, + planned_writes=[], + planned_trims=[], + written=[], + trimmed=[]}). initial_state() -> #state{written=[{0,1024}]}. -initial_state(I) -> #state{written=[{0,1024}], planned_writes=I}. +initial_state(I, T) -> #state{written=[{0,1024}], + planned_writes=I, + planned_trims=T}. weight(_S, rewrite) -> 1; weight(_S, _) -> 2. %% HELPERS -%% check if an operation is permitted based on whether a write has -%% occurred -check_writes(_Op, [], _Off, _L) -> - false; -check_writes(_Op, [{Pos, Sz}|_T], Off, L) when Pos == Off - andalso Sz == L -> - mostly_true; -check_writes(read, [{Pos, Sz}|_T], Off, L) when Off >= Pos - andalso Off < (Pos + Sz) - andalso Sz >= ( L - ( Off - Pos ) ) -> - true; -check_writes(write, [{Pos, Sz}|_T], Off, L) when ( Off + L ) > Pos - andalso Off < (Pos + Sz) -> - true; -check_writes(Op, [_H|T], Off, L) -> - check_writes(Op, T, Off, L). +get_overlaps(_Offset, _Len, [], Acc) -> lists:reverse(Acc); +get_overlaps(Offset, Len, [{Pos, Sz} = Ck|T], Acc0) +%% Overlap judgement differnt from the one in machi_csum_table +%% [a=Offset, b), [x=Pos, y) ... + when + %% a =< x && x < b && b =< y + (Offset =< Pos andalso Pos < Offset + Len andalso Offset + Len =< Pos + Sz) orelse + %% a =< x && y < b + (Offset =< Pos andalso Pos + Sz < Offset + Len) orelse + %% x < a && a < y && y =< b + (Pos < Offset andalso Offset < Pos + Sz andalso Pos + Sz =< Offset + Len) orelse + %% x < a && b < y + (Pos < Offset + Len andalso Offset + Len < Pos + Sz) -> + get_overlaps(Offset, Len, T, [Ck|Acc0]); +get_overlaps(Offset, Len, [_Ck|T], Acc0) -> + %% ?debugVal({Offset, Len, _Ck}), + %% ?debugVal(Offset =< Pos andalso Pos < Offset + Len andalso Offset + Len =< Pos + Sz), + %% ?debugVal(Offset =< Pos andalso Pos + Sz < Offset + Len), + %% ?debugVal(Pos < Offset andalso Offset < Pos + Sz andalso Pos + Sz < Offset + Len), + %% ?debugVal(Pos < Offset + Len andalso Offset + Len < Pos + Sz), + get_overlaps(Offset, Len, T, Acc0). + +%% Inefficient but simple easy code to verify by eyes - returns all +%% bytes that fits in (Offset, Len) +chop(Offset, Len, List) -> + ChopLeft = fun({Pos, Sz}) when Pos < Offset andalso Offset =< Pos + Sz -> + {Offset, Sz + Pos - Offset}; + ({Pos, Sz}) when Offset =< Pos andalso Pos + Sz < Offset + Len -> + {Pos, Sz}; + ({Pos, _Sz}) when Offset =< Pos -> + {Pos, Offset + Len - Pos} + end, + ChopRight = fun({Pos, Sz}) when Offset + Len < Pos + Sz -> + {Pos, Offset + Len - Pos}; + ({Pos, Sz}) -> + {Pos, Sz} + end, + Filter0 = fun({_, 0}) -> false; + (Other) -> {true, Other} end, + lists:filtermap(fun(E) -> Filter0(ChopRight(ChopLeft(E))) end, + List). + +%% Returns all bytes that are at left side of the Offset +chopped_left(_Offset, []) -> undefined; +chopped_left(Offset, [{Pos,_Sz}|_]) when Pos < Offset -> + {Pos, Offset - Pos}; +chopped_left(_, _) -> + undefined. + +chopped_right(_Offset, []) -> undefined; +chopped_right(Offset, List) -> + {Pos, Sz} = lists:last(List), + if Offset < Pos + Sz -> + {Offset, Pos + Sz - Offset}; + true -> + undefined + end. + +cleanup_chunk(Offset, Length, ChunkList) -> + Overlaps = get_overlaps(Offset, Length, ChunkList, []), + NewCL0 = lists:foldl(fun lists:delete/2, + ChunkList, Overlaps), + NewCL1 = case chopped_left(Offset, Overlaps) of + undefined -> NewCL0; + LeftRemain -> [LeftRemain|NewCL0] + end, + NewCL2 = case chopped_right(Offset+Length, Overlaps) of + undefined -> NewCL1; + RightRemain -> [RightRemain|NewCL1] + end, + lists:sort(NewCL2). is_error({error, _}) -> true; is_error({error, _, _}) -> true; is_error(Other) -> {expected_ERROR, Other}. -probably_error(ok) -> true; -probably_error(V) -> is_error(V). - is_ok({ok, _, _}) -> true; is_ok(ok) -> true; is_ok(Other) -> {expected_OK, Other}. @@ -144,9 +204,10 @@ is_ok(Other) -> {expected_OK, Other}. get_offset({ok, _Filename, Offset}) -> Offset; get_offset(_) -> error(badarg). -offset_valid(Offset, Extra, L) -> - {Pos, Sz} = lists:last(L), - Offset == Pos + Sz + Extra. +last_byte([]) -> 0; +last_byte(L0) -> + L1 = lists:map(fun({Pos, Sz}) -> Pos + Sz end, L0), + lists:last(lists:sort(L1)). -define(TESTDIR, "./eqc"). @@ -162,7 +223,7 @@ cleanup() -> %% start start_pre(S) -> - S#state.pid == undefined. + S#state.pid =:= undefined. start_command(S) -> {call, ?MODULE, start, [S]}. @@ -185,29 +246,39 @@ read_pre(S) -> read_args(S) -> [S#state.pid, offset(), len()]. -read_ok(S, Off, L) -> - case S#state.written of - [{0, 1024}] -> false; - W -> check_writes(read, W, Off, L) - end. - read_post(S, [_Pid, Off, L], Res) -> - case read_ok(S, Off, L) of - true -> is_ok(Res); - mostly_true -> is_ok(Res); - false -> is_error(Res) + Written = get_overlaps(Off, L, S#state.written, []), + Chopped = chop(Off, L, Written), + Trimmed = get_overlaps(Off, L, S#state.trimmed, []), + Eof = lists:max([Pos+Sz||{Pos,Sz}<-S#state.written]), + %% ?debugVal({Off, L}), + %% ?debugVal(S), + case Res of + {ok, {Written0, Trimmed0}} -> + Written1 = lists:map(fun({_, Pos, Chunk, _}) -> + {Pos, iolist_size(Chunk)} + end, Written0), + Trimmed1 = lists:map(fun({_, Pos, Sz}) -> {Pos, Sz} end, Trimmed0), + %% ?debugVal({Written, Chopped, Written1}), + %% ?debugVal({Trimmed, Trimmed1}), + %% ?assertEqual(Chopped, Written1), + %% ?assertEqual(Trimmed, Trimmed1), + Chopped =:= Written1 + andalso Trimmed =:= Trimmed1; + %% TODO: such response are ugly, rethink the SPEC + {error, not_written} when Eof < Off + L -> + true; + {error, not_written} when Chopped =:= [] andalso Trimmed =:= [] -> + true; + Other -> + ?debugVal(Other), + is_error(Res) end. read_next(S, _Res, _Args) -> S. read(Pid, Offset, Length) -> - case machi_file_proxy:read(Pid, Offset, Length) of - {ok, {Chunks, _}} -> - [{_, Offset, Data, Csum}] = Chunks, - {ok, Data, Csum}; - E -> - E - end. + machi_file_proxy:read(Pid, Offset, Length, [{needs_trimmed, true}]). %% write @@ -216,6 +287,7 @@ write_pre(S) -> %% do not allow writes with empty data write_pre(_S, [_Pid, _Extra, {<<>>, _Tag, _Csum}]) -> + ?assert(false), false; write_pre(_S, _Args) -> true. @@ -224,28 +296,18 @@ write_args(S) -> {Off, Len} = hd(S#state.planned_writes), [S#state.pid, Off, data_with_csum(Len)]. -write_ok(_S, [_Pid, Off, _Data]) when Off < 1024 -> false; -write_ok(S, [_Pid, Off, {Bin, _Tag, _Csum}]) -> +write_post(S, [_Pid, Off, {Bin, _Tag, _Csum}] = _Args, Res) -> Size = iolist_size(Bin), - %% Check writes checks if a byte range is *written* - %% So writes are ok IFF they are NOT written, so - %% we want not check_writes/3 to be true. - check_writes(write, S#state.written, Off, Size). - -write_post(S, Args, Res) -> - case write_ok(S, Args) of - %% false means this range has NOT been written before, so - %% it should succeed - false -> eq(Res, ok); - %% mostly true means we've written this range before BUT - %% as a special case if we get a call to write the EXACT - %% same data that's already on the disk, we return "ok" - %% instead of {error, written}. - mostly_true -> probably_error(Res); - %% If we get true, then we've already written this section - %% or a portion of this range to disk and should return an - %% error. - true -> is_error(Res) + case {get_overlaps(Off, Size, S#state.written, []), + get_overlaps(Off, Size, S#state.trimmed, [])} of + {[], []} -> + %% No overlap neither with written ranges nor trimmed + %% ranges; OK to write things. + eq(Res, ok); + {_, _} -> + %% overlap found in either or both at written or at + %% trimmed ranges; can't write. + is_error(Res) end. write_next(S, Res, [_Pid, Offset, {Bin, _Tag, _Csum}]) -> @@ -266,6 +328,7 @@ write(Pid, Offset, {Bin, Tag, Csum}) -> %% append append_pre(S) -> + ?assert(undefined =/= S#state.written), S#state.pid /= undefined. %% do not allow appends with empty binary data @@ -286,7 +349,9 @@ append_next(S, Res, [_Pid, Extra, {Bin, _Tag, _Csum}]) -> case is_ok(Res) of true -> Offset = get_offset(Res), - true = offset_valid(Offset, S#state.prev_extra, S#state.written), + Expected = erlang:max(last_byte(S#state.written) + S#state.prev_extra, + last_byte(S#state.trimmed)), + ?assertEqual(Expected, Offset), S#state{prev_extra = Extra, written = lists:sort(S#state.written ++ [{Offset, iolist_size(Bin)}])}; _ -> S @@ -300,11 +365,13 @@ append_post(_S, _Args, Res) -> %% rewrite rewrite_pre(S) -> - S#state.pid /= undefined andalso S#state.written /= []. + S#state.pid /= undefined andalso + (S#state.written ++ S#state.trimmed) /= [] . rewrite_args(S) -> - ?LET({Off, Len}, get_written_interval(S#state.written), - [S#state.pid, Off, data_with_csum(Len)]). + ?LET({Off, Len}, + get_written_interval(S#state.written ++ S#state.trimmed), + [S#state.pid, Off, data_with_csum(Len)]). rewrite(Pid, Offset, {Bin, Tag, Csum}) -> Meta = [{client_csum_tag, Tag}, @@ -317,18 +384,79 @@ rewrite_post(_S, _Args, Res) -> rewrite_next(S, _Res, _Args) -> S#state{prev_extra = 0}. +%% trim + +trim_pre(S) -> + S#state.pid /= undefined andalso S#state.planned_trims /= []. + +trim_args(S) -> + {Offset, Length} = hd(S#state.planned_trims), + [S#state.pid, Offset, Length]. + +trim(Pid, Offset, Length) -> + machi_file_proxy:trim(Pid, Offset, Length, false). + +trim_post(_S, [_Pid, _Offset, _Length], ok) -> + true; +trim_post(_S, [_Pid, _Offset, _Length], _Res) -> + false. + +trim_next(S, Res, [_Pid, Offset, Length]) -> + S1 = case is_ok(Res) of + true -> + NewWritten = cleanup_chunk(Offset, Length, S#state.written), + Trimmed1 = cleanup_chunk(Offset, Length, S#state.trimmed), + NewTrimmed = lists:sort([{Offset, Length}|Trimmed1]), + S#state{trimmed=NewTrimmed, + written=NewWritten}; + _Other -> + S + end, + S1#state{prev_extra=0, + planned_trims=tl(S#state.planned_trims)}. + %% Property prop_ok() -> cleanup(), - ?FORALL(I, shuffle_interval(), - ?FORALL(Cmds, parallel_commands(?MODULE, initial_state(I)), + ?FORALL({I, T}, + {shuffle_interval(), shuffle_interval()}, + ?FORALL(Cmds, parallel_commands(?MODULE, initial_state(I, T)), begin - {H, S, Res} = run_parallel_commands(?MODULE, Cmds), + {H, S, Res} = run_parallel_commands(?MODULE, Cmds), + %% case S#state.pid of + %% undefined -> noop; + %% Pid -> + %% machi_file_proxy:stop(Pid) + %% end, pretty_commands(?MODULE, Cmds, {H, S, Res}, - aggregate(command_names(Cmds), Res == ok)) + aggregate(command_names(Cmds), Res == ok)) end) ). +%% Test for tester functions +chopper_test_() -> + [?_assertEqual([{0, 1024}], + get_overlaps(1, 1, [{0, 1024}], [])), + ?_assertEqual([], + get_overlaps(10, 5, [{9, 1}, {15, 1}], [])), + ?_assertEqual([{9,2},{14,1}], + get_overlaps(10, 5, [{9, 2}, {14, 1}], [])), + ?_assertEqual([], chop(0, 0, [{0,2}])), + ?_assertEqual([{0, 1}], chop(0, 1, [{0,2}])), + ?_assertEqual([], chop(1, 0, [{0,2}])), + ?_assertEqual([{1, 1}], chop(1, 1, [{0,2}])), + ?_assertEqual([{1, 1}], chop(1, 2, [{0,2}])), + ?_assertEqual([], chop(2, 1, [{0,2}])), + ?_assertEqual([], chop(2, 2, [{0,2}])), + ?_assertEqual([{1, 1}], chop(1, 3, [{0,2}])), + ?_assertError(_, chop(3, 1, [{0,2}])), + ?_assertEqual([], chop(2, 3, [{0,2}])), + ?_assertEqual({0, 1}, chopped_left(1, [{0, 1024}])), + ?_assertEqual([{0, 1}, {2, 1022}], cleanup_chunk(1, 1, [{0, 1024}])), + ?_assertEqual([{2, 1022}], cleanup_chunk(0, 2, [{0, 1}, {2, 1022}])), + ?_assert(true) + ]. + -endif. % EQC -endif. % TEST diff --git a/test/machi_file_proxy_test.erl b/test/machi_file_proxy_test.erl index cbf2014..8c4b60b 100644 --- a/test/machi_file_proxy_test.erl +++ b/test/machi_file_proxy_test.erl @@ -83,9 +83,9 @@ machi_file_proxy_test_() -> ?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)), ?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)), ?_assertEqual({error, bad_arg}, machi_file_proxy:append(Pid, [], -1, <<"krep">>)), - ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1)), + ?_assertMatch({ok, {_, []}}, machi_file_proxy:read(Pid, 1, 1)), ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, 1)), - ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1024)), + ?_assertMatch({ok, {_, []}}, machi_file_proxy:read(Pid, 1, 1024)), ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)), ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)), ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE))), @@ -102,6 +102,10 @@ multiple_chunks_read_test_() -> clean_up_data_dir(?TESTDIR), {ok, Pid} = machi_file_proxy:start_link(fluname, "test", ?TESTDIR), [ + ?_assertEqual(ok, machi_file_proxy:trim(Pid, 0, 1, false)), + ?_assertMatch({ok, {[], [{"test", 0, 1}]}}, + machi_file_proxy:read(Pid, 0, 1, + [{needs_trimmed, true}])), ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))), ?_assertEqual(ok, machi_file_proxy:write(Pid, 10000, <<"fail">>)), ?_assertEqual(ok, machi_file_proxy:write(Pid, 20000, <<"fail">>)), @@ -114,6 +118,9 @@ multiple_chunks_read_test_() -> {"test", 30000, <<"fail">>, _}, {"test", 530000, <<"fail">>, _}], []}}, machi_file_proxy:read(Pid, 1024, 530000)), + ?_assertMatch({ok, {[{"test", 1, _, _}], [{"test", 0, 1}]}}, + machi_file_proxy:read(Pid, 0, 1024, + [{needs_trimmed, true}])), ?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid)) ].