From d3fe7ee181d4fdb2e86808174e0b4c0d57633e53 Mon Sep 17 00:00:00 2001 From: Mark Allen Date: Fri, 2 Oct 2015 16:26:42 -0500 Subject: [PATCH] Pull write-once files over to clean branch I am treating the original write-once branch as a prototype which I am now throwing away. I had too much work interleved in there, so I felt like the best thing to do would be to cut a new clean branch and pull the files over and start over against a recent-ish master. We will have to refactor the other things in FLU in a more piecemeal fashion. --- src/machi_file_proxy.erl | 778 +++++++++++++++++++++++++++++++++ src/machi_file_proxy_sup.erl | 46 ++ src/machi_flu_filename_mgr.erl | 167 +++++++ src/machi_flu_metadata_mgr.erl | 217 +++++++++ test/machi_file_proxy_eqc.erl | 328 ++++++++++++++ test/machi_file_proxy_test.erl | 103 +++++ 6 files changed, 1639 insertions(+) create mode 100644 src/machi_file_proxy.erl create mode 100644 src/machi_file_proxy_sup.erl create mode 100644 src/machi_flu_filename_mgr.erl create mode 100644 src/machi_flu_metadata_mgr.erl create mode 100644 test/machi_file_proxy_eqc.erl create mode 100644 test/machi_file_proxy_test.erl diff --git a/src/machi_file_proxy.erl b/src/machi_file_proxy.erl new file mode 100644 index 0000000..6fb6b35 --- /dev/null +++ b/src/machi_file_proxy.erl @@ -0,0 +1,778 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc This is a proxy process which mediates access to Machi FLU +%% controlled files. In particular, it manages the "write-once register" +%% conceit at the heart of Machi's design. +%% +%% Read, write and append requests for a single file will be managed +%% through this proxy. Clients can also request syncs for specific +%% types of filehandles. +%% +%% As operations are requested, the proxy keeps track of how many +%% operations it has performed (and how many errors were generated.) +%% After a sufficient number of inactivity, the server terminates +%% itself. +%% +%% TODO: +%% 1. Some way to transition the proxy into a wedged state that +%% doesn't rely on message delivery. +%% +%% 2. Check max file size on appends. Writes we take on faith we can +%% and should handle. +%% +%% 3. Async checksum reads on startup. + +-module(machi_file_proxy). +-behaviour(gen_server). + +-include("machi.hrl"). + +%% public API +-export([ + start_link/2, + stop/1, + sync/1, + sync/2, + read/3, + write/3, + write/4, + append/2, + append/4 +]). + +%% gen_server callbacks +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +-define(TICK, 30*1000). %% XXX FIXME Should be something like 5 seconds +-define(TICK_THRESHOLD, 5). %% After this + 1 more quiescent ticks, shutdown +-define(TIMEOUT, 10*1000). +-define(TOO_MANY_ERRORS_RATIO, 50). + +-type op_stats() :: { Total :: non_neg_integer(), + Errors :: non_neg_integer() }. + +-type byte_sequence() :: { Offset :: non_neg_integer(), + Size :: pos_integer()|infinity }. + +-record(state, { + data_dir :: string() | undefined, + filename :: string() | undefined, + data_path :: string() | undefined, + wedged = false :: boolean(), + csum_file :: string()|undefined, + csum_path :: string()|undefined, + eof_position = 0 :: non_neg_integer(), + unwritten_bytes = [] :: [byte_sequence()], + data_filehandle :: file:filehandle(), + csum_filehandle :: file:filehandle(), + tref :: reference(), %% timer ref + ticks = 0 :: non_neg_integer(), %% ticks elapsed with no new operations + ops = 0 :: non_neg_integer(), %% sum of all ops + reads = {0, 0} :: op_stats(), + writes = {0, 0} :: op_stats(), + appends = {0, 0} :: op_stats() +}). + +%% Public API + +% @doc Start a new instance of the file proxy service. Takes the filename +% and data directory as arguments. This function is typically called by the +% `machi_file_proxy_sup:start_proxy/2' function. +-spec start_link(Filename :: string(), DataDir :: string()) -> any(). +start_link(Filename, DataDir) -> + gen_server:start_link(?MODULE, {Filename, DataDir}, []). + +% @doc Request to stop an instance of the file proxy service. +-spec stop(Pid :: pid()) -> ok. +stop(Pid) when is_pid(Pid) -> + gen_server:call(Pid, {stop}, ?TIMEOUT). + +% @doc Force a sync of all filehandles +-spec sync(Pid :: pid()) -> ok|{error, term()}. +sync(Pid) when is_pid(Pid) -> + sync(Pid, all); +sync(_Pid) -> + lager:warning("Bad pid to sync"), + {error, bad_arg}. + +% @doc Force a sync of a specific filehandle type. Valid types are `all', `csum' and `data'. +-spec sync(Pid :: pid(), Type :: all|data|csum) -> ok|{error, term()}. +sync(Pid, Type) when is_pid(Pid) andalso + ( Type =:= all orelse Type =:= csum orelse Type =:= data ) -> + gen_server:call(Pid, {sync, Type}, ?TIMEOUT); +sync(_Pid, Type) -> + lager:warning("Bad arg to sync: Type ~p", [Type]), + {error, bad_arg}. + +% @doc Read file at offset for length +-spec read(Pid :: pid(), + Offset :: non_neg_integer(), + Length :: non_neg_integer()) -> {ok, Data :: binary(), Checksum :: binary()} | + {error, Reason :: term()}. +read(Pid, Offset, Length) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0 + andalso is_integer(Length) andalso Length > 0 -> + gen_server:call(Pid, {read, Offset, Length}, ?TIMEOUT); +read(_Pid, Offset, Length) -> + lager:warning("Bad args to read: Offset ~p, Length ~p", [Offset, Length]), + {error, bad_arg}. + +% @doc Write data at offset +-spec write(Pid :: pid(), Offset :: non_neg_integer(), Data :: binary()) -> ok|{error, term()}. +write(Pid, Offset, Data) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0 + andalso is_binary(Data) -> + write(Pid, Offset, [], Data); +write(_Pid, Offset, _Data) -> + lager:warning("Bad arg to write: Offset ~p", [Offset]), + {error, bad_arg}. + +% @doc Write data at offset, including the client metadata. ClientMeta is a proplist +% that expects the following keys and values: +% +-spec write(Pid :: pid(), Offset :: non_neg_integer(), ClientMeta :: proplists:proplist(), + Data :: binary()) -> ok|{error, term()}. +write(Pid, Offset, ClientMeta, Data) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0 + andalso is_list(ClientMeta) andalso is_binary(Data) -> + gen_server:call(Pid, {write, Offset, ClientMeta, Data}, ?TIMEOUT); +write(_Pid, Offset, ClientMeta, _Data) -> + lager:warning("Bad arg to write: Offset ~p, ClientMeta: ~p", [Offset, ClientMeta]), + {error, bad_arg}. + +% @doc Append data +-spec append(Pid :: pid(), Data :: binary()) -> {ok, File :: string(), Offset :: non_neg_integer()} + |{error, term()}. +append(Pid, Data) when is_pid(Pid) andalso is_binary(Data) -> + append(Pid, [], 0, Data); +append(_Pid, _Data) -> + lager:warning("Bad arguments to append/2"), + {error, bad_arg}. + +% @doc Append data to file, supplying client metadata and (if desired) a +% reservation for additional space. ClientMeta is a proplist and expects the +% same keys as write/4. +-spec append(Pid :: pid(), ClientMeta :: proplists:proplist(), + Extra :: non_neg_integer(), Data :: binary()) -> {ok, File :: string(), Offset :: non_neg_integer()} + |{error, term()}. +append(Pid, ClientMeta, Extra, Data) when is_pid(Pid) andalso is_list(ClientMeta) + andalso is_integer(Extra) andalso Extra >= 0 + andalso is_binary(Data) -> + gen_server:call(Pid, {append, ClientMeta, Extra, Data}, ?TIMEOUT); +append(_Pid, ClientMeta, Extra, _Data) -> + lager:warning("Bad arg to append: ClientMeta ~p, Extra ~p", [ClientMeta, Extra]), + {error, bad_arg}. + +%% gen_server callbacks + +% @private +init({Filename, DataDir}) -> + CsumFile = machi_util:make_checksum_filename(DataDir, Filename), + {_, DPath} = machi_util:make_data_filename(DataDir, Filename), + ok = filelib:ensure_dir(CsumFile), + ok = filelib:ensure_dir(DPath), + UnwrittenBytes = parse_csum_file(CsumFile), + {Eof, infinity} = lists:last(UnwrittenBytes), + {ok, FHd} = file:open(DPath, [read, write, binary, raw]), + {ok, FHc} = file:open(CsumFile, [append, binary, raw]), + Tref = schedule_tick(), + St = #state{ + filename = Filename, + data_dir = DataDir, + data_path = DPath, + csum_file = CsumFile, + data_filehandle = FHd, + csum_filehandle = FHc, + tref = Tref, + unwritten_bytes = UnwrittenBytes, + eof_position = Eof}, + lager:debug("Starting file proxy ~p for filename ~p, state = ~p", + [self(), Filename, St]), + {ok, St}. + +% @private +handle_call({stop}, _From, State) -> + lager:debug("Requested to stop."), + {stop, normal, State}; + +handle_call({sync, data}, _From, State = #state{ data_filehandle = FHd }) -> + R = file:sync(FHd), + {reply, R, State}; + +handle_call({sync, csum}, _From, State = #state{ csum_filehandle = FHc }) -> + R = file:sync(FHc), + {reply, R, State}; + +handle_call({sync, all}, _From, State = #state{filename = F, + data_filehandle = FHd, + csum_filehandle = FHc + }) -> + R = file:sync(FHc), + R1 = file:sync(FHd), + Resp = case {R, R1} of + {ok, ok} -> ok; + {ok, O1} -> + lager:error("Got ~p during a data file sync on file ~p", [O1, F]), + O1; + {O2, ok} -> + lager:error("Got ~p during a csum file sync on file ~p", [O2, F]), + O2; + {O3, O4} -> + lager:error("Got ~p ~p syncing all files for file ~p", [O3, O4, F]), + {O3, O4} + end, + {reply, Resp, State}; + +%%% READS + +handle_call({read, _Offset, _Length}, _From, + State = #state{wedged = true, + reads = {T, Err} + }) -> + {reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}}; + +handle_call({read, Offset, Length}, _From, + State = #state{eof_position = Eof, + reads = {T, Err} + }) when Offset + Length > Eof -> + lager:error("Read request at offset ~p for ~p bytes is past the last write offset of ~p", + [Offset, Length, Eof]), + {reply, {error, not_written}, State#state{reads = {T + 1, Err + 1}}}; + +handle_call({read, Offset, Length}, _From, + State = #state{filename = F, + data_filehandle = FH, + unwritten_bytes = U, + reads = {T, Err} + }) -> + + Checksum = get({Offset, Length}), %% N.B. Maybe be 'undefined'! + + {Resp, NewErr} = case handle_read(FH, F, Checksum, Offset, Length, U) of + {ok, Bytes, Csum} -> + {{ok, Bytes, Csum}, Err}; + eof -> + {{error, not_written}, Err + 1}; + Error -> + {Error, Err + 1} + end, + {reply, Resp, State#state{reads = {T+1, NewErr}}}; + +%%% WRITES + +handle_call({write, _Offset, _ClientMeta, _Data}, _From, + State = #state{wedged = true, + writes = {T, Err} + }) -> + {reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}}; + +handle_call({write, Offset, ClientMeta, Data}, _From, + State = #state{unwritten_bytes = U, + filename = F, + writes = {T, Err}, + data_filehandle = FHd, + csum_filehandle = FHc + }) -> + + ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE), + ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>), + + {Resp, NewErr, NewU} = + case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of + {error, {bad_csum, Bad}} -> + lager:error("Bad checksum on write; client sent ~p, we computed ~p", + [ClientCsum, Bad]), + {{error, bad_csum}, Err + 1, U}; + TaggedCsum -> + case handle_write(FHd, FHc, F, TaggedCsum, Offset, Data, U) of + {ok, NewU1} -> + {ok, Err, NewU1}; + Error -> + {Error, Err + 1, U} + end + end, + {NewEof, infinity} = lists:last(NewU), + {reply, Resp, State#state{writes = {T+1, NewErr}, + eof_position = NewEof, + unwritten_bytes = NewU + }}; + +%% APPENDS + +handle_call({append, _ClientMeta, _Extra, _Data}, _From, + State = #state{wedged = true, + appends = {T, Err} + }) -> + {reply, {error, wedged}, State#state{appends = {T+1, Err+1}}}; + +handle_call({append, ClientMeta, Extra, Data}, _From, + State = #state{eof_position = EofP, + unwritten_bytes = U, + filename = F, + appends = {T, Err}, + data_filehandle = FHd, + csum_filehandle = FHc + }) -> + + ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE), + ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>), + + {Resp, NewErr, NewU} = + case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of + {error, {bad_csum, Bad}} -> + lager:error("Bad checksum; client sent ~p, we computed ~p", + [ClientCsum, Bad]), + {{error, bad_csum}, Err + 1, U}; + TaggedCsum -> + case handle_write(FHd, FHc, F, TaggedCsum, EofP, Data, U) of + {ok, NewU1} -> + {{ok, F, EofP}, Err, NewU1}; + Error -> + {Error, Err + 1, EofP, U} + end + end, + {NewEof, infinity} = lists:last(NewU), + {reply, Resp, State#state{appends = {T+1, NewErr}, + eof_position = NewEof + Extra, + unwritten_bytes = NewU + }}; + +handle_call(Req, _From, State) -> + lager:warning("Unknown call: ~p", [Req]), + {reply, whoaaaaaaaaaaaa, State}. + +% @private +handle_cast(Cast, State) -> + lager:warning("Unknown cast: ~p", [Cast]), + {noreply, State}. + +% @private +handle_info(tick, State = #state{eof_position = Eof}) when Eof >= ?MAX_FILE_SIZE -> + lager:notice("Eof position ~p >= max file size ~p. Shutting down.", + [Eof, ?MAX_FILE_SIZE]), + {stop, file_rollover, State}; + +%% XXX Is this a good idea? Need to think this through a bit. +handle_info(tick, State = #state{wedged = true}) -> + {stop, wedged, State}; + +%% I dunno. This may not be a good idea, but it seems like if we're throwing lots of +%% errors, we ought to shut down and give up our file descriptors. +handle_info(tick, State = #state{ + ops = Ops, + reads = {RT, RE}, + writes = {WT, WE}, + appends = {AT, AE} + }) when Ops > 100 andalso + trunc(((RE+WE+AE) / RT+WT+AT) * 100) > ?TOO_MANY_ERRORS_RATIO -> + Errors = RE + WE + AE, + lager:notice("Got ~p errors. Shutting down.", [Errors]), + {stop, too_many_errors, State}; + +handle_info(tick, State = #state{ + ticks = Ticks, + ops = Ops, + reads = {RT, _RE}, + writes = {WT, _WE}, + appends = {AT, _AE}}) when Ops == RT + WT + AT, Ticks == ?TICK_THRESHOLD -> + lager:debug("Got 5 ticks with no new activity. Shutting down."), + {stop, normal, State}; + +handle_info(tick, State = #state{ + ticks = Ticks, + ops = Ops, + reads = {RT, _RE}, + writes = {WT, _WE}, + appends = {AT, _AE}}) when Ops == RT + WT + AT -> + lager:debug("No new activity since last tick. Incrementing tick counter."), + Tref = schedule_tick(), + {noreply, State#state{tref = Tref, ticks = Ticks + 1}}; + +handle_info(tick, State = #state{ + reads = {RT, _RE}, + writes = {WT, _WE}, + appends = {AT, _AE} + }) -> + Ops = RT + WT + AT, + lager:debug("Setting ops counter to ~p", [Ops]), + Tref = schedule_tick(), + {noreply, State#state{tref = Tref, ops = Ops}}; + +%handle_info({wedged, EpochId} State = #state{epoch = E}) when E /= EpochId -> +% lager:notice("Wedge epoch ~p but ignoring because our epoch id is ~p", [EpochId, E]), +% {noreply, State}; + +%handle_info({wedged, EpochId}, State = #state{epoch = E}) when E == EpochId -> +% lager:notice("Wedge epoch ~p same as our epoch id ~p; we are wedged. Bummer.", [EpochId, E]), +% {noreply, State#state{wedged = true}}; + +% flu1.erl: +% ProxyPid = get_proxy_pid(Filename), +% Are we wedged? if not +% machi_file_proxy:read(Pid, Offset, Length) +% otherwise -> error,wedged +% +% get_proxy_pid(Filename) -> +% Pid = lookup_pid(Filename) +% is_pid_alive(Pid) +% Pid +% if not alive then start one + +handle_info(Req, State) -> + lager:warning("Unknown info message: ~p", [Req]), + {noreply, State}. + +% @private +terminate(Reason, #state{filename = F, + data_filehandle = FHd, + csum_filehandle = FHc, + reads = {RT, RE}, + writes = {WT, WE}, + appends = {AT, AE} + }) -> + lager:info("Shutting down proxy for file ~p because ~p", [F, Reason]), + lager:info(" Op Tot/Error", []), + lager:info(" Reads: ~p/~p", [RT, RE]), + lager:info(" Writes: ~p/~p", [WT, WE]), + lager:info("Appends: ~p/~p", [AT, AE]), + ok = file:sync(FHd), + ok = file:sync(FHc), + ok = file:close(FHd), + ok = file:close(FHc), + ok. + +% @private +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% Private functions + +-spec schedule_tick() -> reference(). +schedule_tick() -> + erlang:send_after(?TICK, self(), tick). + +-spec check_or_make_tagged_csum(Type :: binary(), + Checksum :: binary(), + Data :: binary() ) -> binary() | + {error, {bad_csum, Bad :: binary()}}. +check_or_make_tagged_csum(?CSUM_TAG_NONE, _Csum, Data) -> + %% We are making a checksum here + Csum = machi_util:checksum_chunk(Data), + machi_util:make_tagged_csum(server_sha, Csum); +check_or_make_tagged_csum(Tag, InCsum, Data) when Tag == ?CSUM_TAG_CLIENT_SHA; + Tag == ?CSUM_TAG_SERVER_SHA -> + Csum = machi_util:checksum_chunk(Data), + case Csum =:= InCsum of + true -> + machi_util:make_tagged_csum(server_sha, Csum); + false -> + {error, {bad_csum, Csum}} + end; +check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) -> + lager:warning("Unknown checksum tag ~p", [OtherTag]), + {error, bad_csum}. + +encode_csum_file_entry(Offset, Size, TaggedCSum) -> + Len = 8 + 4 + byte_size(TaggedCSum), + [<>, + TaggedCSum]. + +map_offsets_to_csums(CsumList) -> + lists:foreach(fun insert_offsets/1, CsumList). + +insert_offsets({Offset, Length, Checksum}) -> + put({Offset, Length}, Checksum). + +-spec parse_csum_file( Filename :: string() ) -> [byte_sequence()]. +parse_csum_file(Filename) -> + %% using file:read_file works as long as the files are "small" + try + {ok, CsumData} = file:read_file(Filename), + {DecodedCsums, _Junk} = machi_flu1:split_checksum_list_blob_decode(CsumData), + Sort = lists:sort(DecodedCsums), + case Sort of + [] -> [{?MINIMUM_OFFSET, infinity}]; + _ -> + map_offsets_to_csums(DecodedCsums), + {First, _, _} = hd(Sort), + build_unwritten_bytes_list(Sort, First, []) + end + catch + _:{badmatch, {error, enoent}} -> + [{?MINIMUM_OFFSET, infinity}] + end. + +-spec handle_read(FHd :: file:filehandle(), + Filename :: string(), + TaggedCsum :: undefined|binary(), + Offset :: non_neg_integer(), + Size :: non_neg_integer(), + Unwritten :: [byte_sequence()] + ) -> {ok, Bytes :: binary(), Csum :: binary()} | + eof | + {error, bad_csum} | + {error, partial_read} | + {error, not_written} | + {error, Other :: term() }. +% @private Attempt a read operation on the given offset and length. +%
  • +%
      If the byte range is not yet written, `{error, not_written}' is +% returned.
    +%
      If the checksum given does not match what comes off the disk, +% `{error, bad_csum}' is returned.
    +%
      If the number of bytes that comes off the disk is not the requested length, +% `{error, partial_read}' is returned.
    +%
      If the offset is at or beyond the current file boundary, `eof' is returned.
    +%
      If some kind of POSIX error occurs, the OTP version of that POSIX error +% tuple is returned.
    +%
  • +% +% On success, `{ok, Bytes, Checksum}' is returned. +handle_read(FHd, Filename, undefined, Offset, Size, U) -> + handle_read(FHd, Filename, machi_util:make_tagged_csum(none), Offset, Size, U); + +handle_read(FHd, Filename, TaggedCsum, Offset, Size, U) -> + case is_byte_range_unwritten(Offset, Size, U) of + true -> + {error, not_written}; + false -> + do_read(FHd, Filename, TaggedCsum, Offset, Size) + end. + +do_read(FHd, Filename, TaggedCsum, Offset, Size) -> + case file:pread(FHd, Offset, Size) of + eof -> + eof; + {ok, Bytes} when byte_size(Bytes) == Size -> + {Tag, Ck} = machi_util:unmake_tagged_csum(TaggedCsum), + case check_or_make_tagged_csum(Tag, Ck, Bytes) of + {error, Bad} -> + lager:error("Bad checksum; got ~p, expected ~p", + [Bad, Ck]), + {error, bad_csum}; + TaggedCsum -> + {ok, Bytes, TaggedCsum}; + %% XXX FIXME: Should we return something other than + %% {ok, ....} in this case? + OtherCsum when Tag =:= ?CSUM_TAG_NONE -> + {ok, Bytes, OtherCsum} + end; + {ok, Partial} -> + lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p", + [Filename, Offset, Size, byte_size(Partial)]), + {error, partial_read}; + Other -> + lager:error("While reading file ~p, offset ~p, length ~p, got ~p", + [Filename, Offset, Size, Other]), + {error, Other} + end. + +-spec handle_write( FHd :: file:filehandle(), + FHc :: file:filehandle(), + Filename :: string(), + TaggedCsum :: binary(), + Offset :: non_neg_integer(), + Data :: binary(), + Unwritten :: [byte_sequence()] + ) -> {ok, NewU :: [byte_sequence()]} | + {error, written} | + {error, Reason :: term()}. +% @private Implements the write and append operation. The first task is to +% determine if the offset and data size has been written. If not, the write +% is allowed proceed. A special case is made when an offset and data size +% match a checksum. In that case we read the data off the disk, validate the +% checksum and return a "fake" ok response as if the write had been performed +% when it hasn't really. +% +% If a write proceeds, the offset, size and checksum are written to a metadata +% file, and the internal list of unwritten bytes is modified to reflect the +% just-performed write. This is then returned to the caller as +% `{ok, NewUnwritten}' where NewUnwritten is the revised unwritten byte list. +handle_write(FHd, FHc, Filename, TaggedCsum, Offset, Data, U) -> + Size = iolist_size(Data), + + case is_byte_range_unwritten(Offset, Size, U) of + false -> + case get({Offset, Size}) of + undefined -> + {error, written}; + TaggedCsum -> + case do_read(FHd, Filename, TaggedCsum, Offset, Size) of + eof -> + lager:warning("This should never happen: got eof while reading at offset ~p in file ~p that's supposedly written", + [Offset, Filename]), + {error, server_insanity}; + {ok, _, _} -> + {ok, U}; + _ -> + {error, written} + end; + OtherCsum -> + %% Got a checksum, but it doesn't match the data block's + lager:error("During a potential write at offset ~p in file ~p, a check for unwritten bytes gave us checksum ~p but the data we were trying to trying to write has checksum ~p", + [Offset, Filename, OtherCsum, TaggedCsum]), + {error, written} + end; + true -> + try + do_write(FHd, FHc, Filename, TaggedCsum, Offset, Size, Data, U) + catch + %%% XXX FIXME: be more specific on badmatch that might + %%% occur around line 593 when we write the checksum + %%% file entry for the data blob we just put on the disk + error:Reason -> + {error, Reason} + end + end. + +% @private Implements the disk writes for both the write and append +% operation. +-spec do_write( FHd :: file:descriptor(), + FHc :: file:descriptor(), + Filename :: string(), + TaggedCsum :: binary(), + Offset :: non_neg_integer(), + Size :: non_neg_integer(), + Data :: binary(), + Unwritten :: [byte_sequence()] + ) -> {ok, NewUnwritten :: [byte_sequence()]} | + {error, Reason :: term()}. +do_write(FHd, FHc, Filename, TaggedCsum, Offset, Size, Data, U) -> + case file:pwrite(FHd, Offset, Data) of + ok -> + lager:debug("Successful write in file ~p at offset ~p, length ~p", + [Filename, Offset, Size]), + EncodedCsum = encode_csum_file_entry(Offset, Size, TaggedCsum), + ok = file:write(FHc, EncodedCsum), + put({Offset, Size}, TaggedCsum), + NewU = update_unwritten(Offset, Size, U), + lager:debug("Successful write to checksum file for ~p; unwritten bytes are now: ~p", + [Filename, NewU]), + {ok, NewU}; + Other -> + lager:error("Got ~p during write to file ~p at offset ~p, length ~p", + [Other, Filename, Offset, Size]), + {error, Other} + end. + +-spec is_byte_range_unwritten( Offset :: non_neg_integer(), + Size :: pos_integer(), + Unwritten :: [byte_sequence()] ) -> boolean(). +% @private Given an offset and a size, return `true' if a byte range has +% not been written. Otherwise, return `false'. +is_byte_range_unwritten(Offset, Size, Unwritten) -> + case length(Unwritten) of + 0 -> + lager:critical("Unwritten byte list has 0 entries! This should never happen."), + false; + 1 -> + {Eof, infinity} = hd(Unwritten), + Offset >= Eof; + _ -> + case lookup_unwritten(Offset, Size, Unwritten) of + {ok, _} -> true; + not_found -> false + end + end. + +-spec lookup_unwritten( Offset :: non_neg_integer(), + Size :: pos_integer(), + Unwritten :: [byte_sequence()] + ) -> {ok, byte_sequence()} | not_found. +% @private Given an offset and a size, scan the list of unwritten bytes and +% look for a "hole" where a write might be allowed if any exist. If a +% suitable byte sequence is found, the function returns a tuple of {ok, +% {Position, Space}} is returned. `not_found' is returned if no suitable +% space is located. +lookup_unwritten(_Offset, _Size, []) -> + not_found; +lookup_unwritten(Offset, _Size, [H={Pos, infinity}|_Rest]) when Offset >= Pos -> + {ok, H}; +lookup_unwritten(Offset, Size, [H={Pos, Space}|_Rest]) + when Offset >= Pos andalso Offset < Pos+Space + andalso Size =< (Space - (Offset - Pos)) -> + {ok, H}; +lookup_unwritten(Offset, Size, [_H|Rest]) -> + %% These are not the droids you're looking for. + lookup_unwritten(Offset, Size, Rest). + +%%% if the pos is greater than offset + size then we're done. End early. + +-spec update_unwritten( Offset :: non_neg_integer(), + Size :: pos_integer(), + Unwritten :: [byte_sequence()] ) -> NewUnwritten :: [byte_sequence()]. +% @private Given an offset, a size and the unwritten byte list, return an updated +% and sorted unwritten byte list accounting for any completed write operation. +update_unwritten(Offset, Size, Unwritten) -> + case lookup_unwritten(Offset, Size, Unwritten) of + not_found -> + lager:error("Couldn't find byte sequence tuple for a write which earlier found a valid spot to write!!! This should never happen!"), + Unwritten; + {ok, {Offset, Size}} -> + %% we neatly filled in our hole... + lists:keydelete(Offset, 1, Unwritten); + {ok, S={Pos, _}} -> + lists:sort(lists:keydelete(Pos, 1, Unwritten) ++ + update_byte_range(Offset, Size, S)) + end. + +-spec update_byte_range( Offset :: non_neg_integer(), + Size :: pos_integer(), + Sequence :: byte_sequence() ) -> Updates :: [byte_sequence()]. +% @private Given an offset and size and a byte sequence tuple where a +% write took place, return a list of updates to the list of unwritten bytes +% accounting for the space occupied by the just completed write. +update_byte_range(Offset, Size, {Eof, infinity}) when Offset == Eof -> + [{Offset + Size, infinity}]; +update_byte_range(Offset, Size, {Eof, infinity}) when Offset > Eof -> + [{Eof, (Offset - Eof)}, {Offset+Size, infinity}]; +update_byte_range(Offset, Size, {Pos, Space}) when Offset == Pos andalso Size < Space -> + [{Offset + Size, Space - Size}]; +update_byte_range(Offset, Size, {Pos, Space}) when Offset > Pos -> + [{Pos, Offset - Pos}, {Offset+Size, ( (Pos+Space) - (Offset + Size) )}]. + + +-spec build_unwritten_bytes_list( CsumData :: [{ Offset :: non_neg_integer(), + Size :: pos_integer(), + Checksum :: binary() }], + LastOffset :: non_neg_integer(), + Acc :: list() ) -> [byte_sequence()]. +% @private Given a sorted list of checksum data tuples, return a sorted +% list of unwritten byte ranges. The output list always has at least one +% entry: the last tuple in the list is guaranteed to be the current end of +% bytes written to a particular file with the special space moniker +% `infinity'. +build_unwritten_bytes_list([], Last, Acc) -> + NewAcc = [ {Last, infinity} | Acc ], + lists:reverse(NewAcc); +build_unwritten_bytes_list([{CurrentOffset, CurrentSize, _Csum}|Rest], LastOffset, Acc) when + CurrentOffset /= LastOffset -> + Hole = CurrentOffset - LastOffset, + build_unwritten_bytes_list(Rest, (CurrentOffset+CurrentSize), [{LastOffset, Hole}|Acc]); +build_unwritten_bytes_list([{CO, CS, _Ck}|Rest], _LastOffset, Acc) -> + build_unwritten_bytes_list(Rest, CO + CS, Acc). diff --git a/src/machi_file_proxy_sup.erl b/src/machi_file_proxy_sup.erl new file mode 100644 index 0000000..bd5cec2 --- /dev/null +++ b/src/machi_file_proxy_sup.erl @@ -0,0 +1,46 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc This is the main supervisor for the file proxies. +-module(machi_file_proxy_sup). +-behaviour(supervisor). + +%% public API +-export([ + start_link/0, + start_proxy/2 +]). + +%% supervisor callback +-export([ + init/1 +]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +start_proxy(Filename, DataDir) -> + supervisor:start_child(?MODULE, [Filename, DataDir]). + +init([]) -> + SupFlags = {simple_one_for_one, 1000, 10}, + ChildSpec = {unused, {machi_file_proxy, start_link, []}, + temporary, 2000, worker, [machi_file_proxy]}, + {ok, {SupFlags, [ChildSpec]}}. diff --git a/src/machi_flu_filename_mgr.erl b/src/machi_flu_filename_mgr.erl new file mode 100644 index 0000000..c260421 --- /dev/null +++ b/src/machi_flu_filename_mgr.erl @@ -0,0 +1,167 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +%% +%% @doc This process is responsible for managing filenames assigned to +%% prefixes. +%% +%% Supported operations include finding the "current" filename assigned to +%% a prefix. Incrementing the sequence number and returning a new file name +%% and listing all data files assigned to a given prefix. +%% +%% All prefixes should have the form of `{prefix, P}'. Single filename +%% return values have the form of `{file, F}'. +%% +%%

    Finding the current file associated with a sequence

    +%% First it looks up the sequence number from the prefix name. If +%% no sequence file is found, it uses 0 as the sequence number and searches +%% for a matching file with the prefix and 0 as the sequence number. +%% If no file is found, the it generates a new filename by incorporating +%% the given prefix, a randomly generated (v4) UUID and 0 as the +%% sequence number. +%% +%% If the sequence number is > 0, then the process scans the filesystem +%% looking for a filename which matches the prefix and given sequence number and +%% returns that. + +-module(machi_flu_filename_mgr). +-behavior(gen_server). + +-export([ + start_link/1, + find_or_make_filename_from_prefix/1, + increment_prefix_sequence/1, + list_files_by_prefix/1 + ]). + +%% gen_server callbacks +-export([ + init/1, + handle_cast/2, + handle_call/3, + handle_info/2, + terminate/2, + code_change/3 + ]). + +-define(TIMEOUT, 10 * 1000). + +%% public API +start_link(DataDir) when is_list(DataDir) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [DataDir], []). + +-spec find_or_make_filename_from_prefix( Prefix :: {prefix, string()} ) -> + {file, Filename :: string()} | {error, Reason :: term() } | timeout. +% @doc Find the latest available or make a filename from a prefix. A prefix +% should be in the form of a tagged tuple `{prefix, P}'. Returns a tagged +% tuple in the form of `{file, F}' or an `{error, Reason}' +find_or_make_filename_from_prefix({prefix, Prefix}) -> + gen_server:call(?MODULE, {find_filename, Prefix}, ?TIMEOUT); +find_or_make_filename_from_prefix(Other) -> + lager:error("~p is not a valid prefix.", [Other]), + error(badarg). + +-spec increment_prefix_sequence( Prefix :: {prefix, string()} ) -> + ok | {error, Reason :: term() } | timeout. +% @doc Increment the sequence counter for a given prefix. Prefix should +% be in the form of `{prefix, P}'. +increment_prefix_sequence({prefix, Prefix}) -> + gen_server:call(?MODULE, {increment_sequence, Prefix}, ?TIMEOUT); +increment_prefix_sequence(Other) -> + lager:error("~p is not a valid prefix.", [Other]), + error(badarg). + +-spec list_files_by_prefix( Prefix :: {prefix, string()} ) -> + [ file:name() ] | timeout | {error, Reason :: term() }. +% @doc Given a prefix in the form of `{prefix, P}' return +% all the data files associated with that prefix. Returns +% a list. +list_files_by_prefix({prefix, Prefix}) -> + gen_server:call(?MODULE, {list_files, Prefix}, ?TIMEOUT); +list_files_by_prefix(Other) -> + lager:error("~p is not a valid prefix.", [Other]), + error(badarg). + +%% gen_server API +init([DataDir]) -> + {ok, DataDir}. + +handle_cast(Req, State) -> + lager:warning("Got unknown cast ~p", [Req]), + {noreply, State}. + +handle_call({find_filename, Prefix}, _From, DataDir) -> + N = machi_util:read_max_filenum(DataDir, Prefix), + File = case find_file(DataDir, Prefix, N) of + [] -> + {F, _} = machi_util:make_data_filename( + DataDir, + Prefix, + generate_uuid_v4_str(), + N), + F; + [H] -> H; + [Fn | _ ] = L -> + lager:warning( + "Searching for a matching file to prefix ~p and sequence number ~p gave multiples: ~p", + [Prefix, N, L]), + Fn + end, + {reply, {file, File}, DataDir}; +handle_call({increment_sequence, Prefix}, _From, DataDir) -> + ok = machi_util:increment_max_filenum(DataDir, Prefix), + {reply, ok, DataDir}; +handle_call({list_files, Prefix}, From, DataDir) -> + spawn(fun() -> + L = list_files(DataDir, Prefix), + gen_server:reply(From, L) + end), + {noreply, DataDir}; + +handle_call(Req, From, State) -> + lager:warning("Got unknown call ~p from ~p", [Req, From]), + {reply, hoge, State}. + +handle_info(Info, State) -> + lager:warning("Got unknown info ~p", [Info]), + {noreply, State}. + +terminate(Reason, _State) -> + lager:info("Shutting down because ~p", [Reason]), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% private + +%% Quoted from https://github.com/afiskon/erlang-uuid-v4/blob/master/src/uuid.erl +%% MIT License +generate_uuid_v4_str() -> + <> = crypto:strong_rand_bytes(16), + io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b", + [A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]). + +find_file(DataDir, Prefix, N) -> + {_Filename, Path} = machi_util:make_data_filename(DataDir, Prefix, "*", N), + filelib:wildcard(Path). + +list_files(DataDir, Prefix) -> + {F, Path} = machi_util:make_data_filename(DataDir, Prefix, "*", "*"), + filelib:wildcard(F, filename:dirname(Path)). diff --git a/src/machi_flu_metadata_mgr.erl b/src/machi_flu_metadata_mgr.erl new file mode 100644 index 0000000..0fbafa9 --- /dev/null +++ b/src/machi_flu_metadata_mgr.erl @@ -0,0 +1,217 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc This is a metadata service for the machi FLU which currently +%% tracks the mappings between filenames and file proxies. +%% +%% The service takes a given hash space and spreads it out over a +%% pool of N processes which are responsible for 1/Nth the hash +%% space. When a user requests an operation on a particular file +%% the filename is hashed into the hash space and the request +%% forwarded to a particular manager responsible for that slice +%% of the hash space. +%% +%% The current hash implementation is `erlang:phash2/1' which has +%% a range between 0..2^27-1 or 134,217,727. + +-module(machi_flu_metadata_mgr). +-behaviour(gen_server). + + +-define(MAX_MGRS, 10). %% number of managers to start by default. +-define(HASH(X), erlang:phash2(X)). %% hash algorithm to use +-define(TIMEOUT, 10 * 1000). %% 10 second timeout + +-record(state, {name :: atom(), + datadir :: string(), + tid :: ets:tid() + }). + +%% This record goes in the ets table where prefix is the key +-record(md, {filename :: string(), + proxy_pid :: undefined|pid(), + mref :: undefined|reference() %% monitor ref for file proxy + }). + +%% public api +-export([ + start_link/2, + lookup_manager_pid/1, + lookup_proxy_pid/1, + start_proxy_pid/1, + stop_proxy_pid/1 + ]). + +%% gen_server callbacks +-export([ + init/1, + handle_cast/2, + handle_call/3, + handle_info/2, + terminate/2, + code_change/3 + ]). + +%% Public API + +start_link(Name, DataDir) when is_atom(Name) andalso is_list(DataDir) -> + gen_server:start_link({local, Name}, ?MODULE, [Name, DataDir], []). + +lookup_manager_pid({file, Filename}) -> + whereis(get_manager_atom(Filename)). + +lookup_proxy_pid({file, Filename}) -> + gen_server:call(get_manager_atom(Filename), {proxy_pid, Filename}, ?TIMEOUT). + +start_proxy_pid({file, Filename}) -> + gen_server:call(get_manager_atom(Filename), {start_proxy_pid, Filename}, ?TIMEOUT). + +stop_proxy_pid({file, Filename}) -> + gen_server:call(get_manager_atom(Filename), {stop_proxy_pid, Filename}, ?TIMEOUT). + +%% gen_server callbacks +init([Name, DataDir]) -> + Tid = ets:new(Name, [{keypos, 2}, {read_concurrency, true}, {write_concurrency, true}]), + {ok, #state{ name = Name, datadir = DataDir, tid = Tid}}. + +handle_cast(Req, State) -> + lager:warning("Got unknown cast ~p", [Req]), + {noreply, State}. + +handle_call({proxy_pid, Filename}, _From, State = #state{ tid = Tid }) -> + Reply = case lookup_md(Tid, Filename) of + not_found -> undefined; + R -> R#md.proxy_pid + end, + {reply, Reply, State}; + +handle_call({start_proxy_pid, Filename}, _From, State = #state{ tid = Tid, datadir = D }) -> + NewR = case lookup_md(Tid, Filename) of + not_found -> + start_file_proxy(D, Filename); + #md{ proxy_pid = undefined } = R0 -> + start_file_proxy(D, R0); + #md{ proxy_pid = _Pid } = R1 -> + R1 + end, + update_ets(Tid, NewR), + {reply, {ok, NewR#md.proxy_pid}, State}; +handle_call({stop_proxy_pid, Filename}, _From, State = #state{ tid = Tid }) -> + case lookup_md(Tid, Filename) of + not_found -> + ok; + #md{ proxy_pid = undefined } -> + ok; + #md{ proxy_pid = Pid, mref = M } = R -> + demonitor(M, [flush]), + machi_file_proxy:stop(Pid), + update_ets(Tid, R#md{ proxy_pid = undefined, mref = undefined }) + end, + {reply, ok, State}; + +handle_call(Req, From, State) -> + lager:warning("Got unknown call ~p from ~p", [Req, From]), + {reply, hoge, State}. + +handle_info({'DOWN', Mref, process, Pid, normal}, State = #state{ tid = Tid }) -> + lager:debug("file proxy ~p shutdown normally", [Pid]), + clear_ets(Tid, Mref), + {noreply, State}; + +handle_info({'DOWN', Mref, process, Pid, file_rollover}, State = #state{ tid = Tid }) -> + lager:info("file proxy ~p shutdown because of file rollover", [Pid]), + R = get_md_record_by_mref(Tid, Mref), + [Prefix | _Rest] = machi_util:parse_filename({file, R#md.filename}), + + %% We only increment the counter here. The filename will be generated on the + %% next append request to that prefix and since the filename will have a new + %% sequence number it probably will be associated with a different metadata + %% manager. That's why we don't want to generate a new file name immediately + %% and use it to start a new file proxy. + ok = machi_flu_filename_mgr:increment_prefix_sequence({prefix, Prefix}), + + %% purge our ets table of this entry completely since it is likely the + %% new filename (whenever it comes) will be in a different manager than + %% us. + purge_ets(Tid, R), + {noreply, State}; + +handle_info({'DOWN', Mref, process, Pid, wedged}, State = #state{ tid = Tid }) -> + lager:error("file proxy ~p shutdown because it's wedged", [Pid]), + clear_ets(Tid, Mref), + {noreply, State}; +handle_info({'DOWN', Mref, process, Pid, Error}, State = #state{ tid = Tid }) -> + lager:error("file proxy ~p shutdown because ~p", [Pid, Error]), + clear_ets(Tid, Mref), + {noreply, State}; + + +handle_info(Info, State) -> + lager:warning("Got unknown info ~p", [Info]), + {noreply, State}. + +terminate(Reason, _State) -> + lager:info("Shutting down because ~p", [Reason]), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% Private functions + +compute_hash(Data) -> + ?HASH(Data). + +compute_worker(Hash) -> + Hash rem ?MAX_MGRS. + +build_metadata_mgr_name(N) when is_integer(N) -> + list_to_atom("machi_flu_metadata_mgr_" ++ integer_to_list(N)). + +get_manager_atom(Data) -> + build_metadata_mgr_name(compute_worker(compute_hash(Data))). + +lookup_md(Tid, Data) -> + case ets:lookup(Tid, Data) of + [] -> not_found; + [R] -> R + end. + +start_file_proxy(D, R = #md{filename = F} ) -> + {ok, Pid} = machi_file_proxy_sup:start_proxy(D, F), + Mref = monitor(process, Pid), + R#md{ proxy_pid = Pid, mref = Mref }; + +start_file_proxy(D, Filename) -> + start_file_proxy(D, #md{ filename = Filename }). + +update_ets(Tid, R) -> + ets:insert(Tid, R). + +clear_ets(Tid, Mref) -> + R = get_md_record_by_mref(Tid, Mref), + update_ets(Tid, R#md{ proxy_pid = undefined, mref = undefined }). + +purge_ets(Tid, R) -> + ok = ets:delete_object(Tid, R). + +get_md_record_by_mref(Tid, Mref) -> + [R] = ets:match_object(Tid, {md, '_', '_', Mref}), + R. diff --git a/test/machi_file_proxy_eqc.erl b/test/machi_file_proxy_eqc.erl new file mode 100644 index 0000000..39b0852 --- /dev/null +++ b/test/machi_file_proxy_eqc.erl @@ -0,0 +1,328 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(machi_file_proxy_eqc). + +-ifdef(TEST). +-ifdef(EQC). +-compile(export_all). +-include("machi.hrl"). +-include_lib("eqc/include/eqc.hrl"). +-include_lib("eqc/include/eqc_statem.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(QC_OUT(P), + eqc:on_output(fun(Str, Args) -> io:format(user, Str, Args) end, P)). + + +%% EUNIT TEST DEFINITION +eqc_test_() -> + {timeout, 60, + {spawn, + [ + {timeout, 30, ?_assertEqual(true, eqc:quickcheck(eqc:testing_time(15, ?QC_OUT(prop_ok()))))} + ] + }}. + +%% SHELL HELPERS +test() -> + test(100). + +test(N) -> + quickcheck(numtests(N, prop_ok())). + +check() -> + check(prop_ok(), current_counterexample()). + +%% GENERATORS + +csum_type() -> + elements([?CSUM_TAG_NONE, ?CSUM_TAG_CLIENT_SHA, ?CSUM_TAG_SERVER_SHA]). + +csum(Type, Binary) -> + case Type of + ?CSUM_TAG_NONE -> <<>>; + _ -> machi_util:checksum_chunk(Binary) + end. + +position(P) -> + ?LET(O, offset(), P + O). + +offset() -> + ?SUCHTHAT(X, int(), X >= 0). + +offset_base() -> + elements([4096, 6144, 7168, 8192, 20480, 100000, 1000000]). + +big_offset() -> + ?LET(P, int(), ?LET(X, offset_base(), P+X)). + +len() -> + ?SUCHTHAT(X, int(), X >= 1). + +data_with_csum() -> + ?LET({B,T},{eqc_gen:largebinary(), csum_type()}, {B,T, csum(T, B)}). + %?LET({B,T},{eqc_gen:binary(), csum_type()}, {B,T, csum(T, B)}). + +data_with_csum(Limit) -> + %?LET({B,T},{?LET(S, Limit, eqc_gen:largebinary(S)), csum_type()}, {B,T, csum(T, B)}). + ?LET({B,T},{?LET(S, Limit, eqc_gen:binary(S)), csum_type()}, {B,T, csum(T, B)}). + +intervals([]) -> + []; +intervals([N]) -> + [{N, choose(1,150)}]; +intervals([A,B|T]) -> + [{A, choose(1, B-A)}|intervals([B|T])]. + +interval_list() -> + ?LET(L, list(choose(1024, 4096)), intervals(lists:usort(L))). + +shuffle_interval() -> + ?LET(L, interval_list(), shuffle(L)). + +get_written_interval(L) -> + ?LET({O, Ln}, elements(L), {O+1, Ln-1}). + +%% INITIALIZATION + +-record(state, {pid, prev_extra = 0, planned_writes=[], written=[]}). + +initial_state() -> #state{written=[{0,1024}]}. +initial_state(I) -> #state{written=[{0,1024}], planned_writes=I}. + +weight(_S, rewrite) -> 1; +weight(_S, _) -> 2. + +%% HELPERS + +%% check if an operation is permitted based on whether a write has +%% occurred +check_writes(_Op, [], _Off, _L) -> + false; +check_writes(_Op, [{Pos, Sz}|_T], Off, L) when Pos == Off + andalso Sz == L -> + mostly_true; +check_writes(read, [{Pos, Sz}|_T], Off, L) when Off >= Pos + andalso Off < (Pos + Sz) + andalso Sz >= ( L - ( Off - Pos ) ) -> + true; +check_writes(write, [{Pos, Sz}|_T], Off, L) when ( Off + L ) > Pos + andalso Off < (Pos + Sz) -> + true; +check_writes(Op, [_H|T], Off, L) -> + check_writes(Op, T, Off, L). + +is_error({error, _}) -> true; +is_error({error, _, _}) -> true; +is_error(Other) -> {expected_ERROR, Other}. + +probably_error(ok) -> true; +probably_error(V) -> is_error(V). + +is_ok({ok, _, _}) -> true; +is_ok(ok) -> true; +is_ok(Other) -> {expected_OK, Other}. + +get_offset({ok, _Filename, Offset}) -> Offset; +get_offset(_) -> error(badarg). + +offset_valid(Offset, Extra, L) -> + {Pos, Sz} = lists:last(L), + Offset == Pos + Sz + Extra. + +-define(TESTDIR, "./eqc"). + +cleanup() -> + [begin + Fs = filelib:wildcard(?TESTDIR ++ Glob), + [file:delete(F) || F <- Fs], + [file:del_dir(F) || F <- Fs] + end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ], + _ = file:del_dir(?TESTDIR), + ok. + +%% start + +start_pre(S) -> + S#state.pid == undefined. + +start_command(S) -> + {call, ?MODULE, start, [S]}. + +start(_S) -> + {_, _, MS} = os:timestamp(), + File = test_server:temp_name("eqc_data") ++ "." ++ integer_to_list(MS), + {ok, Pid} = machi_file_proxy:start_link(File, ?TESTDIR), + unlink(Pid), + Pid. + +start_next(S, Pid, _Args) -> + S#state{pid = Pid}. + +%% read + +read_pre(S) -> + S#state.pid /= undefined. + +read_args(S) -> + [S#state.pid, offset(), len()]. + +read_ok(S, Off, L) -> + case S#state.written of + [{0, 1024}] -> false; + W -> check_writes(read, W, Off, L) + end. + +read_post(S, [_Pid, Off, L], Res) -> + case read_ok(S, Off, L) of + true -> is_ok(Res); + mostly_true -> is_ok(Res); + false -> is_error(Res) + end. + +read_next(S, _Res, _Args) -> S. + +read(Pid, Offset, Length) -> + machi_file_proxy:read(Pid, Offset, Length). + +%% write + +write_pre(S) -> + S#state.pid /= undefined andalso S#state.planned_writes /= []. + +%% do not allow writes with empty data +write_pre(_S, [_Pid, _Extra, {<<>>, _Tag, _Csum}]) -> + false; +write_pre(_S, _Args) -> + true. + +write_args(S) -> + {Off, Len} = hd(S#state.planned_writes), + [S#state.pid, Off, data_with_csum(Len)]. + +write_ok(_S, [_Pid, Off, _Data]) when Off < 1024 -> false; +write_ok(S, [_Pid, Off, {Bin, _Tag, _Csum}]) -> + Size = iolist_size(Bin), + %% Check writes checks if a byte range is *written* + %% So writes are ok IFF they are NOT written, so + %% we want not check_writes/3 to be true. + check_writes(write, S#state.written, Off, Size). + +write_post(S, Args, Res) -> + case write_ok(S, Args) of + %% false means this range has NOT been written before, so + %% it should succeed + false -> eq(Res, ok); + %% mostly true means we've written this range before BUT + %% as a special case if we get a call to write the EXACT + %% same data that's already on the disk, we return "ok" + %% instead of {error, written}. + mostly_true -> probably_error(Res); + %% If we get true, then we've already written this section + %% or a portion of this range to disk and should return an + %% error. + true -> is_error(Res) + end. + +write_next(S, Res, [_Pid, Offset, {Bin, _Tag, _Csum}]) -> + S0 = case is_ok(Res) of + true -> + S#state{written = lists:sort(S#state.written ++ [{Offset, iolist_size(Bin)}]) }; + _ -> + S + end, + S0#state{prev_extra = 0, planned_writes=tl(S0#state.planned_writes)}. + + +write(Pid, Offset, {Bin, Tag, Csum}) -> + Meta = [{client_csum_tag, Tag}, + {client_csum, Csum}], + machi_file_proxy:write(Pid, Offset, Meta, Bin). + +%% append + +append_pre(S) -> + S#state.pid /= undefined. + +%% do not allow appends with empty binary data +append_pre(_S, [_Pid, _Extra, {<<>>, _Tag, _Csum}]) -> + false; +append_pre(_S, _Args) -> + true. + +append_args(S) -> + [S#state.pid, default(0, len()), data_with_csum()]. + +append(Pid, Extra, {Bin, Tag, Csum}) -> + Meta = [{client_csum_tag, Tag}, + {client_csum, Csum}], + machi_file_proxy:append(Pid, Meta, Extra, Bin). + +append_next(S, Res, [_Pid, Extra, {Bin, _Tag, _Csum}]) -> + case is_ok(Res) of + true -> + Offset = get_offset(Res), + true = offset_valid(Offset, S#state.prev_extra, S#state.written), + S#state{prev_extra = Extra, written = lists:sort(S#state.written ++ [{Offset, iolist_size(Bin)}])}; + _ -> + S + end. + +%% appends should always succeed unless the disk is full +%% or there's a hardware failure. +append_post(_S, _Args, Res) -> + true == is_ok(Res). + +%% rewrite + +rewrite_pre(S) -> + S#state.pid /= undefined andalso S#state.written /= []. + +rewrite_args(S) -> + ?LET({Off, Len}, get_written_interval(S#state.written), + [S#state.pid, Off, data_with_csum(Len)]). + +rewrite(Pid, Offset, {Bin, Tag, Csum}) -> + Meta = [{client_csum_tag, Tag}, + {client_csum, Csum}], + machi_file_proxy:write(Pid, Offset, Meta, Bin). + +rewrite_post(_S, _Args, Res) -> + is_error(Res). + +rewrite_next(S, _Res, _Args) -> + S#state{prev_extra = 0}. + +%% Property + +prop_ok() -> + cleanup(), + ?FORALL(I, shuffle_interval(), + ?FORALL(Cmds, parallel_commands(?MODULE, initial_state(I)), + begin + {H, S, Res} = run_parallel_commands(?MODULE, Cmds), + pretty_commands(?MODULE, Cmds, {H, S, Res}, + aggregate(command_names(Cmds), Res == ok)) + end) + ). + +-endif. % EQC +-endif. % TEST diff --git a/test/machi_file_proxy_test.erl b/test/machi_file_proxy_test.erl new file mode 100644 index 0000000..bcb0085 --- /dev/null +++ b/test/machi_file_proxy_test.erl @@ -0,0 +1,103 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(machi_file_proxy_test). + +-ifdef(TEST). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include("machi.hrl"). + +clean_up_data_dir(DataDir) -> + [begin + Fs = filelib:wildcard(DataDir ++ Glob), + [file:delete(F) || F <- Fs], + [file:del_dir(F) || F <- Fs] + end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ], + _ = file:del_dir(DataDir), + ok. + +-ifndef(PULSE). + +-define(TESTDIR, "./t"). +-define(HYOOGE, 1 * 1024 * 1024 * 1024). % 1 long GB + +random_binary_single() -> + %% OK, I guess it's not that random... + <<"Four score and seven years ago our fathers brought forth on this + continent a new nation, conceived in liberty, and dedicated to the + proposition that all men are created equal. + + Now we are engaged in a great civil war, testing whether that nation, or any + nation so conceived and so dedicated, can long endure. We are met on a great + battlefield of that war. We have come to dedicate a portion of that field, as a + final resting place for those who here gave their lives that that nation + might live. It is altogether fitting and proper that we should do this. + + But, in a larger sense, we can not dedicate, we can not consecrate, we can not + hallow this ground. The brave men, living and dead, who struggled here, have + consecrated it, far above our poor power to add or detract. The world will + little note, nor long remember what we say here, but it can never forget what + they did here. It is for us the living, rather, to be dedicated here to the + unfinished work which they who fought here have thus far so nobly advanced. It + is rather for us to be here dedicated to the great task remaining before us— + that from these honored dead we take increased devotion to that cause for which + they gave the last full measure of devotion— that we here highly resolve that + these dead shall not have died in vain— that this nation, under God, shall have + a new birth of freedom— and that government of the people, by the people, for + the people, shall not perish from the earth.">>. + +random_binary(Start, End) -> + Size = byte_size(random_binary_single()) - 1, + case End > Size of + true -> + Copies = ( End div Size ) + 1, + D0 = binary:copy(random_binary_single(), Copies), + binary:part(<>, Start, End); + false -> + binary:part(random_binary_single(), Start, End) + end. + +machi_file_proxy_test_() -> + clean_up_data_dir(?TESTDIR), + {ok, Pid} = machi_file_proxy:start_link("test", ?TESTDIR), + [ + ?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)), + ?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)), + ?_assertEqual({error, bad_arg}, machi_file_proxy:append(Pid, [], -1, <<"krep">>)), + ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1)), + ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, 1)), + ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1024)), + ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)), + ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)), + ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE))), + ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))), + ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)), + ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)), + ?_assertMatch({ok, _, _}, machi_file_proxy:read(Pid, 1025, 1000)), + ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)), + ?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))), + ?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid)) + ]. + +-endif. % !PULSE +-endif. % TEST. +