Pull write-once files over to clean branch

I am treating the original write-once branch as a prototype
which I am now throwing away. I had too much work interleved
in there, so I felt like the best thing to do would be to cut
a new clean branch and pull the files over and start over
against a recent-ish master.

We will have to refactor the other things in FLU in a more
piecemeal fashion.
This commit is contained in:
Mark Allen 2015-10-02 16:26:42 -05:00
parent d7daf203fb
commit d3fe7ee181
6 changed files with 1639 additions and 0 deletions

778
src/machi_file_proxy.erl Normal file
View file

@ -0,0 +1,778 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc This is a proxy process which mediates access to Machi FLU
%% controlled files. In particular, it manages the "write-once register"
%% conceit at the heart of Machi's design.
%%
%% Read, write and append requests for a single file will be managed
%% through this proxy. Clients can also request syncs for specific
%% types of filehandles.
%%
%% As operations are requested, the proxy keeps track of how many
%% operations it has performed (and how many errors were generated.)
%% After a sufficient number of inactivity, the server terminates
%% itself.
%%
%% TODO:
%% 1. Some way to transition the proxy into a wedged state that
%% doesn't rely on message delivery.
%%
%% 2. Check max file size on appends. Writes we take on faith we can
%% and should handle.
%%
%% 3. Async checksum reads on startup.
-module(machi_file_proxy).
-behaviour(gen_server).
-include("machi.hrl").
%% public API
-export([
start_link/2,
stop/1,
sync/1,
sync/2,
read/3,
write/3,
write/4,
append/2,
append/4
]).
%% gen_server callbacks
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
-define(TICK, 30*1000). %% XXX FIXME Should be something like 5 seconds
-define(TICK_THRESHOLD, 5). %% After this + 1 more quiescent ticks, shutdown
-define(TIMEOUT, 10*1000).
-define(TOO_MANY_ERRORS_RATIO, 50).
-type op_stats() :: { Total :: non_neg_integer(),
Errors :: non_neg_integer() }.
-type byte_sequence() :: { Offset :: non_neg_integer(),
Size :: pos_integer()|infinity }.
-record(state, {
data_dir :: string() | undefined,
filename :: string() | undefined,
data_path :: string() | undefined,
wedged = false :: boolean(),
csum_file :: string()|undefined,
csum_path :: string()|undefined,
eof_position = 0 :: non_neg_integer(),
unwritten_bytes = [] :: [byte_sequence()],
data_filehandle :: file:filehandle(),
csum_filehandle :: file:filehandle(),
tref :: reference(), %% timer ref
ticks = 0 :: non_neg_integer(), %% ticks elapsed with no new operations
ops = 0 :: non_neg_integer(), %% sum of all ops
reads = {0, 0} :: op_stats(),
writes = {0, 0} :: op_stats(),
appends = {0, 0} :: op_stats()
}).
%% Public API
% @doc Start a new instance of the file proxy service. Takes the filename
% and data directory as arguments. This function is typically called by the
% `machi_file_proxy_sup:start_proxy/2' function.
-spec start_link(Filename :: string(), DataDir :: string()) -> any().
start_link(Filename, DataDir) ->
gen_server:start_link(?MODULE, {Filename, DataDir}, []).
% @doc Request to stop an instance of the file proxy service.
-spec stop(Pid :: pid()) -> ok.
stop(Pid) when is_pid(Pid) ->
gen_server:call(Pid, {stop}, ?TIMEOUT).
% @doc Force a sync of all filehandles
-spec sync(Pid :: pid()) -> ok|{error, term()}.
sync(Pid) when is_pid(Pid) ->
sync(Pid, all);
sync(_Pid) ->
lager:warning("Bad pid to sync"),
{error, bad_arg}.
% @doc Force a sync of a specific filehandle type. Valid types are `all', `csum' and `data'.
-spec sync(Pid :: pid(), Type :: all|data|csum) -> ok|{error, term()}.
sync(Pid, Type) when is_pid(Pid) andalso
( Type =:= all orelse Type =:= csum orelse Type =:= data ) ->
gen_server:call(Pid, {sync, Type}, ?TIMEOUT);
sync(_Pid, Type) ->
lager:warning("Bad arg to sync: Type ~p", [Type]),
{error, bad_arg}.
% @doc Read file at offset for length
-spec read(Pid :: pid(),
Offset :: non_neg_integer(),
Length :: non_neg_integer()) -> {ok, Data :: binary(), Checksum :: binary()} |
{error, Reason :: term()}.
read(Pid, Offset, Length) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
andalso is_integer(Length) andalso Length > 0 ->
gen_server:call(Pid, {read, Offset, Length}, ?TIMEOUT);
read(_Pid, Offset, Length) ->
lager:warning("Bad args to read: Offset ~p, Length ~p", [Offset, Length]),
{error, bad_arg}.
% @doc Write data at offset
-spec write(Pid :: pid(), Offset :: non_neg_integer(), Data :: binary()) -> ok|{error, term()}.
write(Pid, Offset, Data) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
andalso is_binary(Data) ->
write(Pid, Offset, [], Data);
write(_Pid, Offset, _Data) ->
lager:warning("Bad arg to write: Offset ~p", [Offset]),
{error, bad_arg}.
% @doc Write data at offset, including the client metadata. ClientMeta is a proplist
% that expects the following keys and values:
% <ul>
% <li>`client_csum_tag' - the type of checksum from the client as defined in the machi.hrl file</li>
% <li>`client_csum' - the checksum value from the client</li>
% </ul>
-spec write(Pid :: pid(), Offset :: non_neg_integer(), ClientMeta :: proplists:proplist(),
Data :: binary()) -> ok|{error, term()}.
write(Pid, Offset, ClientMeta, Data) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
andalso is_list(ClientMeta) andalso is_binary(Data) ->
gen_server:call(Pid, {write, Offset, ClientMeta, Data}, ?TIMEOUT);
write(_Pid, Offset, ClientMeta, _Data) ->
lager:warning("Bad arg to write: Offset ~p, ClientMeta: ~p", [Offset, ClientMeta]),
{error, bad_arg}.
% @doc Append data
-spec append(Pid :: pid(), Data :: binary()) -> {ok, File :: string(), Offset :: non_neg_integer()}
|{error, term()}.
append(Pid, Data) when is_pid(Pid) andalso is_binary(Data) ->
append(Pid, [], 0, Data);
append(_Pid, _Data) ->
lager:warning("Bad arguments to append/2"),
{error, bad_arg}.
% @doc Append data to file, supplying client metadata and (if desired) a
% reservation for additional space. ClientMeta is a proplist and expects the
% same keys as write/4.
-spec append(Pid :: pid(), ClientMeta :: proplists:proplist(),
Extra :: non_neg_integer(), Data :: binary()) -> {ok, File :: string(), Offset :: non_neg_integer()}
|{error, term()}.
append(Pid, ClientMeta, Extra, Data) when is_pid(Pid) andalso is_list(ClientMeta)
andalso is_integer(Extra) andalso Extra >= 0
andalso is_binary(Data) ->
gen_server:call(Pid, {append, ClientMeta, Extra, Data}, ?TIMEOUT);
append(_Pid, ClientMeta, Extra, _Data) ->
lager:warning("Bad arg to append: ClientMeta ~p, Extra ~p", [ClientMeta, Extra]),
{error, bad_arg}.
%% gen_server callbacks
% @private
init({Filename, DataDir}) ->
CsumFile = machi_util:make_checksum_filename(DataDir, Filename),
{_, DPath} = machi_util:make_data_filename(DataDir, Filename),
ok = filelib:ensure_dir(CsumFile),
ok = filelib:ensure_dir(DPath),
UnwrittenBytes = parse_csum_file(CsumFile),
{Eof, infinity} = lists:last(UnwrittenBytes),
{ok, FHd} = file:open(DPath, [read, write, binary, raw]),
{ok, FHc} = file:open(CsumFile, [append, binary, raw]),
Tref = schedule_tick(),
St = #state{
filename = Filename,
data_dir = DataDir,
data_path = DPath,
csum_file = CsumFile,
data_filehandle = FHd,
csum_filehandle = FHc,
tref = Tref,
unwritten_bytes = UnwrittenBytes,
eof_position = Eof},
lager:debug("Starting file proxy ~p for filename ~p, state = ~p",
[self(), Filename, St]),
{ok, St}.
% @private
handle_call({stop}, _From, State) ->
lager:debug("Requested to stop."),
{stop, normal, State};
handle_call({sync, data}, _From, State = #state{ data_filehandle = FHd }) ->
R = file:sync(FHd),
{reply, R, State};
handle_call({sync, csum}, _From, State = #state{ csum_filehandle = FHc }) ->
R = file:sync(FHc),
{reply, R, State};
handle_call({sync, all}, _From, State = #state{filename = F,
data_filehandle = FHd,
csum_filehandle = FHc
}) ->
R = file:sync(FHc),
R1 = file:sync(FHd),
Resp = case {R, R1} of
{ok, ok} -> ok;
{ok, O1} ->
lager:error("Got ~p during a data file sync on file ~p", [O1, F]),
O1;
{O2, ok} ->
lager:error("Got ~p during a csum file sync on file ~p", [O2, F]),
O2;
{O3, O4} ->
lager:error("Got ~p ~p syncing all files for file ~p", [O3, O4, F]),
{O3, O4}
end,
{reply, Resp, State};
%%% READS
handle_call({read, _Offset, _Length}, _From,
State = #state{wedged = true,
reads = {T, Err}
}) ->
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};
handle_call({read, Offset, Length}, _From,
State = #state{eof_position = Eof,
reads = {T, Err}
}) when Offset + Length > Eof ->
lager:error("Read request at offset ~p for ~p bytes is past the last write offset of ~p",
[Offset, Length, Eof]),
{reply, {error, not_written}, State#state{reads = {T + 1, Err + 1}}};
handle_call({read, Offset, Length}, _From,
State = #state{filename = F,
data_filehandle = FH,
unwritten_bytes = U,
reads = {T, Err}
}) ->
Checksum = get({Offset, Length}), %% N.B. Maybe be 'undefined'!
{Resp, NewErr} = case handle_read(FH, F, Checksum, Offset, Length, U) of
{ok, Bytes, Csum} ->
{{ok, Bytes, Csum}, Err};
eof ->
{{error, not_written}, Err + 1};
Error ->
{Error, Err + 1}
end,
{reply, Resp, State#state{reads = {T+1, NewErr}}};
%%% WRITES
handle_call({write, _Offset, _ClientMeta, _Data}, _From,
State = #state{wedged = true,
writes = {T, Err}
}) ->
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};
handle_call({write, Offset, ClientMeta, Data}, _From,
State = #state{unwritten_bytes = U,
filename = F,
writes = {T, Err},
data_filehandle = FHd,
csum_filehandle = FHc
}) ->
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE),
ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>),
{Resp, NewErr, NewU} =
case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of
{error, {bad_csum, Bad}} ->
lager:error("Bad checksum on write; client sent ~p, we computed ~p",
[ClientCsum, Bad]),
{{error, bad_csum}, Err + 1, U};
TaggedCsum ->
case handle_write(FHd, FHc, F, TaggedCsum, Offset, Data, U) of
{ok, NewU1} ->
{ok, Err, NewU1};
Error ->
{Error, Err + 1, U}
end
end,
{NewEof, infinity} = lists:last(NewU),
{reply, Resp, State#state{writes = {T+1, NewErr},
eof_position = NewEof,
unwritten_bytes = NewU
}};
%% APPENDS
handle_call({append, _ClientMeta, _Extra, _Data}, _From,
State = #state{wedged = true,
appends = {T, Err}
}) ->
{reply, {error, wedged}, State#state{appends = {T+1, Err+1}}};
handle_call({append, ClientMeta, Extra, Data}, _From,
State = #state{eof_position = EofP,
unwritten_bytes = U,
filename = F,
appends = {T, Err},
data_filehandle = FHd,
csum_filehandle = FHc
}) ->
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE),
ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>),
{Resp, NewErr, NewU} =
case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of
{error, {bad_csum, Bad}} ->
lager:error("Bad checksum; client sent ~p, we computed ~p",
[ClientCsum, Bad]),
{{error, bad_csum}, Err + 1, U};
TaggedCsum ->
case handle_write(FHd, FHc, F, TaggedCsum, EofP, Data, U) of
{ok, NewU1} ->
{{ok, F, EofP}, Err, NewU1};
Error ->
{Error, Err + 1, EofP, U}
end
end,
{NewEof, infinity} = lists:last(NewU),
{reply, Resp, State#state{appends = {T+1, NewErr},
eof_position = NewEof + Extra,
unwritten_bytes = NewU
}};
handle_call(Req, _From, State) ->
lager:warning("Unknown call: ~p", [Req]),
{reply, whoaaaaaaaaaaaa, State}.
% @private
handle_cast(Cast, State) ->
lager:warning("Unknown cast: ~p", [Cast]),
{noreply, State}.
% @private
handle_info(tick, State = #state{eof_position = Eof}) when Eof >= ?MAX_FILE_SIZE ->
lager:notice("Eof position ~p >= max file size ~p. Shutting down.",
[Eof, ?MAX_FILE_SIZE]),
{stop, file_rollover, State};
%% XXX Is this a good idea? Need to think this through a bit.
handle_info(tick, State = #state{wedged = true}) ->
{stop, wedged, State};
%% I dunno. This may not be a good idea, but it seems like if we're throwing lots of
%% errors, we ought to shut down and give up our file descriptors.
handle_info(tick, State = #state{
ops = Ops,
reads = {RT, RE},
writes = {WT, WE},
appends = {AT, AE}
}) when Ops > 100 andalso
trunc(((RE+WE+AE) / RT+WT+AT) * 100) > ?TOO_MANY_ERRORS_RATIO ->
Errors = RE + WE + AE,
lager:notice("Got ~p errors. Shutting down.", [Errors]),
{stop, too_many_errors, State};
handle_info(tick, State = #state{
ticks = Ticks,
ops = Ops,
reads = {RT, _RE},
writes = {WT, _WE},
appends = {AT, _AE}}) when Ops == RT + WT + AT, Ticks == ?TICK_THRESHOLD ->
lager:debug("Got 5 ticks with no new activity. Shutting down."),
{stop, normal, State};
handle_info(tick, State = #state{
ticks = Ticks,
ops = Ops,
reads = {RT, _RE},
writes = {WT, _WE},
appends = {AT, _AE}}) when Ops == RT + WT + AT ->
lager:debug("No new activity since last tick. Incrementing tick counter."),
Tref = schedule_tick(),
{noreply, State#state{tref = Tref, ticks = Ticks + 1}};
handle_info(tick, State = #state{
reads = {RT, _RE},
writes = {WT, _WE},
appends = {AT, _AE}
}) ->
Ops = RT + WT + AT,
lager:debug("Setting ops counter to ~p", [Ops]),
Tref = schedule_tick(),
{noreply, State#state{tref = Tref, ops = Ops}};
%handle_info({wedged, EpochId} State = #state{epoch = E}) when E /= EpochId ->
% lager:notice("Wedge epoch ~p but ignoring because our epoch id is ~p", [EpochId, E]),
% {noreply, State};
%handle_info({wedged, EpochId}, State = #state{epoch = E}) when E == EpochId ->
% lager:notice("Wedge epoch ~p same as our epoch id ~p; we are wedged. Bummer.", [EpochId, E]),
% {noreply, State#state{wedged = true}};
% flu1.erl:
% ProxyPid = get_proxy_pid(Filename),
% Are we wedged? if not
% machi_file_proxy:read(Pid, Offset, Length)
% otherwise -> error,wedged
%
% get_proxy_pid(Filename) ->
% Pid = lookup_pid(Filename)
% is_pid_alive(Pid)
% Pid
% if not alive then start one
handle_info(Req, State) ->
lager:warning("Unknown info message: ~p", [Req]),
{noreply, State}.
% @private
terminate(Reason, #state{filename = F,
data_filehandle = FHd,
csum_filehandle = FHc,
reads = {RT, RE},
writes = {WT, WE},
appends = {AT, AE}
}) ->
lager:info("Shutting down proxy for file ~p because ~p", [F, Reason]),
lager:info(" Op Tot/Error", []),
lager:info(" Reads: ~p/~p", [RT, RE]),
lager:info(" Writes: ~p/~p", [WT, WE]),
lager:info("Appends: ~p/~p", [AT, AE]),
ok = file:sync(FHd),
ok = file:sync(FHc),
ok = file:close(FHd),
ok = file:close(FHc),
ok.
% @private
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% Private functions
-spec schedule_tick() -> reference().
schedule_tick() ->
erlang:send_after(?TICK, self(), tick).
-spec check_or_make_tagged_csum(Type :: binary(),
Checksum :: binary(),
Data :: binary() ) -> binary() |
{error, {bad_csum, Bad :: binary()}}.
check_or_make_tagged_csum(?CSUM_TAG_NONE, _Csum, Data) ->
%% We are making a checksum here
Csum = machi_util:checksum_chunk(Data),
machi_util:make_tagged_csum(server_sha, Csum);
check_or_make_tagged_csum(Tag, InCsum, Data) when Tag == ?CSUM_TAG_CLIENT_SHA;
Tag == ?CSUM_TAG_SERVER_SHA ->
Csum = machi_util:checksum_chunk(Data),
case Csum =:= InCsum of
true ->
machi_util:make_tagged_csum(server_sha, Csum);
false ->
{error, {bad_csum, Csum}}
end;
check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
lager:warning("Unknown checksum tag ~p", [OtherTag]),
{error, bad_csum}.
encode_csum_file_entry(Offset, Size, TaggedCSum) ->
Len = 8 + 4 + byte_size(TaggedCSum),
[<<Len:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big>>,
TaggedCSum].
map_offsets_to_csums(CsumList) ->
lists:foreach(fun insert_offsets/1, CsumList).
insert_offsets({Offset, Length, Checksum}) ->
put({Offset, Length}, Checksum).
-spec parse_csum_file( Filename :: string() ) -> [byte_sequence()].
parse_csum_file(Filename) ->
%% using file:read_file works as long as the files are "small"
try
{ok, CsumData} = file:read_file(Filename),
{DecodedCsums, _Junk} = machi_flu1:split_checksum_list_blob_decode(CsumData),
Sort = lists:sort(DecodedCsums),
case Sort of
[] -> [{?MINIMUM_OFFSET, infinity}];
_ ->
map_offsets_to_csums(DecodedCsums),
{First, _, _} = hd(Sort),
build_unwritten_bytes_list(Sort, First, [])
end
catch
_:{badmatch, {error, enoent}} ->
[{?MINIMUM_OFFSET, infinity}]
end.
-spec handle_read(FHd :: file:filehandle(),
Filename :: string(),
TaggedCsum :: undefined|binary(),
Offset :: non_neg_integer(),
Size :: non_neg_integer(),
Unwritten :: [byte_sequence()]
) -> {ok, Bytes :: binary(), Csum :: binary()} |
eof |
{error, bad_csum} |
{error, partial_read} |
{error, not_written} |
{error, Other :: term() }.
% @private Attempt a read operation on the given offset and length.
% <li>
% <ul> If the byte range is not yet written, `{error, not_written}' is
% returned.</ul>
% <ul> If the checksum given does not match what comes off the disk,
% `{error, bad_csum}' is returned.</ul>
% <ul> If the number of bytes that comes off the disk is not the requested length,
% `{error, partial_read}' is returned.</ul>
% <ul> If the offset is at or beyond the current file boundary, `eof' is returned.</ul>
% <ul> If some kind of POSIX error occurs, the OTP version of that POSIX error
% tuple is returned.</ul>
% </li>
%
% On success, `{ok, Bytes, Checksum}' is returned.
handle_read(FHd, Filename, undefined, Offset, Size, U) ->
handle_read(FHd, Filename, machi_util:make_tagged_csum(none), Offset, Size, U);
handle_read(FHd, Filename, TaggedCsum, Offset, Size, U) ->
case is_byte_range_unwritten(Offset, Size, U) of
true ->
{error, not_written};
false ->
do_read(FHd, Filename, TaggedCsum, Offset, Size)
end.
do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
case file:pread(FHd, Offset, Size) of
eof ->
eof;
{ok, Bytes} when byte_size(Bytes) == Size ->
{Tag, Ck} = machi_util:unmake_tagged_csum(TaggedCsum),
case check_or_make_tagged_csum(Tag, Ck, Bytes) of
{error, Bad} ->
lager:error("Bad checksum; got ~p, expected ~p",
[Bad, Ck]),
{error, bad_csum};
TaggedCsum ->
{ok, Bytes, TaggedCsum};
%% XXX FIXME: Should we return something other than
%% {ok, ....} in this case?
OtherCsum when Tag =:= ?CSUM_TAG_NONE ->
{ok, Bytes, OtherCsum}
end;
{ok, Partial} ->
lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p",
[Filename, Offset, Size, byte_size(Partial)]),
{error, partial_read};
Other ->
lager:error("While reading file ~p, offset ~p, length ~p, got ~p",
[Filename, Offset, Size, Other]),
{error, Other}
end.
-spec handle_write( FHd :: file:filehandle(),
FHc :: file:filehandle(),
Filename :: string(),
TaggedCsum :: binary(),
Offset :: non_neg_integer(),
Data :: binary(),
Unwritten :: [byte_sequence()]
) -> {ok, NewU :: [byte_sequence()]} |
{error, written} |
{error, Reason :: term()}.
% @private Implements the write and append operation. The first task is to
% determine if the offset and data size has been written. If not, the write
% is allowed proceed. A special case is made when an offset and data size
% match a checksum. In that case we read the data off the disk, validate the
% checksum and return a "fake" ok response as if the write had been performed
% when it hasn't really.
%
% If a write proceeds, the offset, size and checksum are written to a metadata
% file, and the internal list of unwritten bytes is modified to reflect the
% just-performed write. This is then returned to the caller as
% `{ok, NewUnwritten}' where NewUnwritten is the revised unwritten byte list.
handle_write(FHd, FHc, Filename, TaggedCsum, Offset, Data, U) ->
Size = iolist_size(Data),
case is_byte_range_unwritten(Offset, Size, U) of
false ->
case get({Offset, Size}) of
undefined ->
{error, written};
TaggedCsum ->
case do_read(FHd, Filename, TaggedCsum, Offset, Size) of
eof ->
lager:warning("This should never happen: got eof while reading at offset ~p in file ~p that's supposedly written",
[Offset, Filename]),
{error, server_insanity};
{ok, _, _} ->
{ok, U};
_ ->
{error, written}
end;
OtherCsum ->
%% Got a checksum, but it doesn't match the data block's
lager:error("During a potential write at offset ~p in file ~p, a check for unwritten bytes gave us checksum ~p but the data we were trying to trying to write has checksum ~p",
[Offset, Filename, OtherCsum, TaggedCsum]),
{error, written}
end;
true ->
try
do_write(FHd, FHc, Filename, TaggedCsum, Offset, Size, Data, U)
catch
%%% XXX FIXME: be more specific on badmatch that might
%%% occur around line 593 when we write the checksum
%%% file entry for the data blob we just put on the disk
error:Reason ->
{error, Reason}
end
end.
% @private Implements the disk writes for both the write and append
% operation.
-spec do_write( FHd :: file:descriptor(),
FHc :: file:descriptor(),
Filename :: string(),
TaggedCsum :: binary(),
Offset :: non_neg_integer(),
Size :: non_neg_integer(),
Data :: binary(),
Unwritten :: [byte_sequence()]
) -> {ok, NewUnwritten :: [byte_sequence()]} |
{error, Reason :: term()}.
do_write(FHd, FHc, Filename, TaggedCsum, Offset, Size, Data, U) ->
case file:pwrite(FHd, Offset, Data) of
ok ->
lager:debug("Successful write in file ~p at offset ~p, length ~p",
[Filename, Offset, Size]),
EncodedCsum = encode_csum_file_entry(Offset, Size, TaggedCsum),
ok = file:write(FHc, EncodedCsum),
put({Offset, Size}, TaggedCsum),
NewU = update_unwritten(Offset, Size, U),
lager:debug("Successful write to checksum file for ~p; unwritten bytes are now: ~p",
[Filename, NewU]),
{ok, NewU};
Other ->
lager:error("Got ~p during write to file ~p at offset ~p, length ~p",
[Other, Filename, Offset, Size]),
{error, Other}
end.
-spec is_byte_range_unwritten( Offset :: non_neg_integer(),
Size :: pos_integer(),
Unwritten :: [byte_sequence()] ) -> boolean().
% @private Given an offset and a size, return `true' if a byte range has
% <b>not</b> been written. Otherwise, return `false'.
is_byte_range_unwritten(Offset, Size, Unwritten) ->
case length(Unwritten) of
0 ->
lager:critical("Unwritten byte list has 0 entries! This should never happen."),
false;
1 ->
{Eof, infinity} = hd(Unwritten),
Offset >= Eof;
_ ->
case lookup_unwritten(Offset, Size, Unwritten) of
{ok, _} -> true;
not_found -> false
end
end.
-spec lookup_unwritten( Offset :: non_neg_integer(),
Size :: pos_integer(),
Unwritten :: [byte_sequence()]
) -> {ok, byte_sequence()} | not_found.
% @private Given an offset and a size, scan the list of unwritten bytes and
% look for a "hole" where a write might be allowed if any exist. If a
% suitable byte sequence is found, the function returns a tuple of {ok,
% {Position, Space}} is returned. `not_found' is returned if no suitable
% space is located.
lookup_unwritten(_Offset, _Size, []) ->
not_found;
lookup_unwritten(Offset, _Size, [H={Pos, infinity}|_Rest]) when Offset >= Pos ->
{ok, H};
lookup_unwritten(Offset, Size, [H={Pos, Space}|_Rest])
when Offset >= Pos andalso Offset < Pos+Space
andalso Size =< (Space - (Offset - Pos)) ->
{ok, H};
lookup_unwritten(Offset, Size, [_H|Rest]) ->
%% These are not the droids you're looking for.
lookup_unwritten(Offset, Size, Rest).
%%% if the pos is greater than offset + size then we're done. End early.
-spec update_unwritten( Offset :: non_neg_integer(),
Size :: pos_integer(),
Unwritten :: [byte_sequence()] ) -> NewUnwritten :: [byte_sequence()].
% @private Given an offset, a size and the unwritten byte list, return an updated
% and sorted unwritten byte list accounting for any completed write operation.
update_unwritten(Offset, Size, Unwritten) ->
case lookup_unwritten(Offset, Size, Unwritten) of
not_found ->
lager:error("Couldn't find byte sequence tuple for a write which earlier found a valid spot to write!!! This should never happen!"),
Unwritten;
{ok, {Offset, Size}} ->
%% we neatly filled in our hole...
lists:keydelete(Offset, 1, Unwritten);
{ok, S={Pos, _}} ->
lists:sort(lists:keydelete(Pos, 1, Unwritten) ++
update_byte_range(Offset, Size, S))
end.
-spec update_byte_range( Offset :: non_neg_integer(),
Size :: pos_integer(),
Sequence :: byte_sequence() ) -> Updates :: [byte_sequence()].
% @private Given an offset and size and a byte sequence tuple where a
% write took place, return a list of updates to the list of unwritten bytes
% accounting for the space occupied by the just completed write.
update_byte_range(Offset, Size, {Eof, infinity}) when Offset == Eof ->
[{Offset + Size, infinity}];
update_byte_range(Offset, Size, {Eof, infinity}) when Offset > Eof ->
[{Eof, (Offset - Eof)}, {Offset+Size, infinity}];
update_byte_range(Offset, Size, {Pos, Space}) when Offset == Pos andalso Size < Space ->
[{Offset + Size, Space - Size}];
update_byte_range(Offset, Size, {Pos, Space}) when Offset > Pos ->
[{Pos, Offset - Pos}, {Offset+Size, ( (Pos+Space) - (Offset + Size) )}].
-spec build_unwritten_bytes_list( CsumData :: [{ Offset :: non_neg_integer(),
Size :: pos_integer(),
Checksum :: binary() }],
LastOffset :: non_neg_integer(),
Acc :: list() ) -> [byte_sequence()].
% @private Given a <b>sorted</b> list of checksum data tuples, return a sorted
% list of unwritten byte ranges. The output list <b>always</b> has at least one
% entry: the last tuple in the list is guaranteed to be the current end of
% bytes written to a particular file with the special space moniker
% `infinity'.
build_unwritten_bytes_list([], Last, Acc) ->
NewAcc = [ {Last, infinity} | Acc ],
lists:reverse(NewAcc);
build_unwritten_bytes_list([{CurrentOffset, CurrentSize, _Csum}|Rest], LastOffset, Acc) when
CurrentOffset /= LastOffset ->
Hole = CurrentOffset - LastOffset,
build_unwritten_bytes_list(Rest, (CurrentOffset+CurrentSize), [{LastOffset, Hole}|Acc]);
build_unwritten_bytes_list([{CO, CS, _Ck}|Rest], _LastOffset, Acc) ->
build_unwritten_bytes_list(Rest, CO + CS, Acc).

View file

@ -0,0 +1,46 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc This is the main supervisor for the file proxies.
-module(machi_file_proxy_sup).
-behaviour(supervisor).
%% public API
-export([
start_link/0,
start_proxy/2
]).
%% supervisor callback
-export([
init/1
]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_proxy(Filename, DataDir) ->
supervisor:start_child(?MODULE, [Filename, DataDir]).
init([]) ->
SupFlags = {simple_one_for_one, 1000, 10},
ChildSpec = {unused, {machi_file_proxy, start_link, []},
temporary, 2000, worker, [machi_file_proxy]},
{ok, {SupFlags, [ChildSpec]}}.

View file

@ -0,0 +1,167 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%%
%% @doc This process is responsible for managing filenames assigned to
%% prefixes.
%%
%% Supported operations include finding the "current" filename assigned to
%% a prefix. Incrementing the sequence number and returning a new file name
%% and listing all data files assigned to a given prefix.
%%
%% All prefixes should have the form of `{prefix, P}'. Single filename
%% return values have the form of `{file, F}'.
%%
%% <h2>Finding the current file associated with a sequence</h2>
%% First it looks up the sequence number from the prefix name. If
%% no sequence file is found, it uses 0 as the sequence number and searches
%% for a matching file with the prefix and 0 as the sequence number.
%% If no file is found, the it generates a new filename by incorporating
%% the given prefix, a randomly generated (v4) UUID and 0 as the
%% sequence number.
%%
%% If the sequence number is > 0, then the process scans the filesystem
%% looking for a filename which matches the prefix and given sequence number and
%% returns that.
-module(machi_flu_filename_mgr).
-behavior(gen_server).
-export([
start_link/1,
find_or_make_filename_from_prefix/1,
increment_prefix_sequence/1,
list_files_by_prefix/1
]).
%% gen_server callbacks
-export([
init/1,
handle_cast/2,
handle_call/3,
handle_info/2,
terminate/2,
code_change/3
]).
-define(TIMEOUT, 10 * 1000).
%% public API
start_link(DataDir) when is_list(DataDir) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [DataDir], []).
-spec find_or_make_filename_from_prefix( Prefix :: {prefix, string()} ) ->
{file, Filename :: string()} | {error, Reason :: term() } | timeout.
% @doc Find the latest available or make a filename from a prefix. A prefix
% should be in the form of a tagged tuple `{prefix, P}'. Returns a tagged
% tuple in the form of `{file, F}' or an `{error, Reason}'
find_or_make_filename_from_prefix({prefix, Prefix}) ->
gen_server:call(?MODULE, {find_filename, Prefix}, ?TIMEOUT);
find_or_make_filename_from_prefix(Other) ->
lager:error("~p is not a valid prefix.", [Other]),
error(badarg).
-spec increment_prefix_sequence( Prefix :: {prefix, string()} ) ->
ok | {error, Reason :: term() } | timeout.
% @doc Increment the sequence counter for a given prefix. Prefix should
% be in the form of `{prefix, P}'.
increment_prefix_sequence({prefix, Prefix}) ->
gen_server:call(?MODULE, {increment_sequence, Prefix}, ?TIMEOUT);
increment_prefix_sequence(Other) ->
lager:error("~p is not a valid prefix.", [Other]),
error(badarg).
-spec list_files_by_prefix( Prefix :: {prefix, string()} ) ->
[ file:name() ] | timeout | {error, Reason :: term() }.
% @doc Given a prefix in the form of `{prefix, P}' return
% all the data files associated with that prefix. Returns
% a list.
list_files_by_prefix({prefix, Prefix}) ->
gen_server:call(?MODULE, {list_files, Prefix}, ?TIMEOUT);
list_files_by_prefix(Other) ->
lager:error("~p is not a valid prefix.", [Other]),
error(badarg).
%% gen_server API
init([DataDir]) ->
{ok, DataDir}.
handle_cast(Req, State) ->
lager:warning("Got unknown cast ~p", [Req]),
{noreply, State}.
handle_call({find_filename, Prefix}, _From, DataDir) ->
N = machi_util:read_max_filenum(DataDir, Prefix),
File = case find_file(DataDir, Prefix, N) of
[] ->
{F, _} = machi_util:make_data_filename(
DataDir,
Prefix,
generate_uuid_v4_str(),
N),
F;
[H] -> H;
[Fn | _ ] = L ->
lager:warning(
"Searching for a matching file to prefix ~p and sequence number ~p gave multiples: ~p",
[Prefix, N, L]),
Fn
end,
{reply, {file, File}, DataDir};
handle_call({increment_sequence, Prefix}, _From, DataDir) ->
ok = machi_util:increment_max_filenum(DataDir, Prefix),
{reply, ok, DataDir};
handle_call({list_files, Prefix}, From, DataDir) ->
spawn(fun() ->
L = list_files(DataDir, Prefix),
gen_server:reply(From, L)
end),
{noreply, DataDir};
handle_call(Req, From, State) ->
lager:warning("Got unknown call ~p from ~p", [Req, From]),
{reply, hoge, State}.
handle_info(Info, State) ->
lager:warning("Got unknown info ~p", [Info]),
{noreply, State}.
terminate(Reason, _State) ->
lager:info("Shutting down because ~p", [Reason]),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% private
%% Quoted from https://github.com/afiskon/erlang-uuid-v4/blob/master/src/uuid.erl
%% MIT License
generate_uuid_v4_str() ->
<<A:32, B:16, C:16, D:16, E:48>> = crypto:strong_rand_bytes(16),
io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b",
[A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]).
find_file(DataDir, Prefix, N) ->
{_Filename, Path} = machi_util:make_data_filename(DataDir, Prefix, "*", N),
filelib:wildcard(Path).
list_files(DataDir, Prefix) ->
{F, Path} = machi_util:make_data_filename(DataDir, Prefix, "*", "*"),
filelib:wildcard(F, filename:dirname(Path)).

View file

@ -0,0 +1,217 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc This is a metadata service for the machi FLU which currently
%% tracks the mappings between filenames and file proxies.
%%
%% The service takes a given hash space and spreads it out over a
%% pool of N processes which are responsible for 1/Nth the hash
%% space. When a user requests an operation on a particular file
%% the filename is hashed into the hash space and the request
%% forwarded to a particular manager responsible for that slice
%% of the hash space.
%%
%% The current hash implementation is `erlang:phash2/1' which has
%% a range between 0..2^27-1 or 134,217,727.
-module(machi_flu_metadata_mgr).
-behaviour(gen_server).
-define(MAX_MGRS, 10). %% number of managers to start by default.
-define(HASH(X), erlang:phash2(X)). %% hash algorithm to use
-define(TIMEOUT, 10 * 1000). %% 10 second timeout
-record(state, {name :: atom(),
datadir :: string(),
tid :: ets:tid()
}).
%% This record goes in the ets table where prefix is the key
-record(md, {filename :: string(),
proxy_pid :: undefined|pid(),
mref :: undefined|reference() %% monitor ref for file proxy
}).
%% public api
-export([
start_link/2,
lookup_manager_pid/1,
lookup_proxy_pid/1,
start_proxy_pid/1,
stop_proxy_pid/1
]).
%% gen_server callbacks
-export([
init/1,
handle_cast/2,
handle_call/3,
handle_info/2,
terminate/2,
code_change/3
]).
%% Public API
start_link(Name, DataDir) when is_atom(Name) andalso is_list(DataDir) ->
gen_server:start_link({local, Name}, ?MODULE, [Name, DataDir], []).
lookup_manager_pid({file, Filename}) ->
whereis(get_manager_atom(Filename)).
lookup_proxy_pid({file, Filename}) ->
gen_server:call(get_manager_atom(Filename), {proxy_pid, Filename}, ?TIMEOUT).
start_proxy_pid({file, Filename}) ->
gen_server:call(get_manager_atom(Filename), {start_proxy_pid, Filename}, ?TIMEOUT).
stop_proxy_pid({file, Filename}) ->
gen_server:call(get_manager_atom(Filename), {stop_proxy_pid, Filename}, ?TIMEOUT).
%% gen_server callbacks
init([Name, DataDir]) ->
Tid = ets:new(Name, [{keypos, 2}, {read_concurrency, true}, {write_concurrency, true}]),
{ok, #state{ name = Name, datadir = DataDir, tid = Tid}}.
handle_cast(Req, State) ->
lager:warning("Got unknown cast ~p", [Req]),
{noreply, State}.
handle_call({proxy_pid, Filename}, _From, State = #state{ tid = Tid }) ->
Reply = case lookup_md(Tid, Filename) of
not_found -> undefined;
R -> R#md.proxy_pid
end,
{reply, Reply, State};
handle_call({start_proxy_pid, Filename}, _From, State = #state{ tid = Tid, datadir = D }) ->
NewR = case lookup_md(Tid, Filename) of
not_found ->
start_file_proxy(D, Filename);
#md{ proxy_pid = undefined } = R0 ->
start_file_proxy(D, R0);
#md{ proxy_pid = _Pid } = R1 ->
R1
end,
update_ets(Tid, NewR),
{reply, {ok, NewR#md.proxy_pid}, State};
handle_call({stop_proxy_pid, Filename}, _From, State = #state{ tid = Tid }) ->
case lookup_md(Tid, Filename) of
not_found ->
ok;
#md{ proxy_pid = undefined } ->
ok;
#md{ proxy_pid = Pid, mref = M } = R ->
demonitor(M, [flush]),
machi_file_proxy:stop(Pid),
update_ets(Tid, R#md{ proxy_pid = undefined, mref = undefined })
end,
{reply, ok, State};
handle_call(Req, From, State) ->
lager:warning("Got unknown call ~p from ~p", [Req, From]),
{reply, hoge, State}.
handle_info({'DOWN', Mref, process, Pid, normal}, State = #state{ tid = Tid }) ->
lager:debug("file proxy ~p shutdown normally", [Pid]),
clear_ets(Tid, Mref),
{noreply, State};
handle_info({'DOWN', Mref, process, Pid, file_rollover}, State = #state{ tid = Tid }) ->
lager:info("file proxy ~p shutdown because of file rollover", [Pid]),
R = get_md_record_by_mref(Tid, Mref),
[Prefix | _Rest] = machi_util:parse_filename({file, R#md.filename}),
%% We only increment the counter here. The filename will be generated on the
%% next append request to that prefix and since the filename will have a new
%% sequence number it probably will be associated with a different metadata
%% manager. That's why we don't want to generate a new file name immediately
%% and use it to start a new file proxy.
ok = machi_flu_filename_mgr:increment_prefix_sequence({prefix, Prefix}),
%% purge our ets table of this entry completely since it is likely the
%% new filename (whenever it comes) will be in a different manager than
%% us.
purge_ets(Tid, R),
{noreply, State};
handle_info({'DOWN', Mref, process, Pid, wedged}, State = #state{ tid = Tid }) ->
lager:error("file proxy ~p shutdown because it's wedged", [Pid]),
clear_ets(Tid, Mref),
{noreply, State};
handle_info({'DOWN', Mref, process, Pid, Error}, State = #state{ tid = Tid }) ->
lager:error("file proxy ~p shutdown because ~p", [Pid, Error]),
clear_ets(Tid, Mref),
{noreply, State};
handle_info(Info, State) ->
lager:warning("Got unknown info ~p", [Info]),
{noreply, State}.
terminate(Reason, _State) ->
lager:info("Shutting down because ~p", [Reason]),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% Private functions
compute_hash(Data) ->
?HASH(Data).
compute_worker(Hash) ->
Hash rem ?MAX_MGRS.
build_metadata_mgr_name(N) when is_integer(N) ->
list_to_atom("machi_flu_metadata_mgr_" ++ integer_to_list(N)).
get_manager_atom(Data) ->
build_metadata_mgr_name(compute_worker(compute_hash(Data))).
lookup_md(Tid, Data) ->
case ets:lookup(Tid, Data) of
[] -> not_found;
[R] -> R
end.
start_file_proxy(D, R = #md{filename = F} ) ->
{ok, Pid} = machi_file_proxy_sup:start_proxy(D, F),
Mref = monitor(process, Pid),
R#md{ proxy_pid = Pid, mref = Mref };
start_file_proxy(D, Filename) ->
start_file_proxy(D, #md{ filename = Filename }).
update_ets(Tid, R) ->
ets:insert(Tid, R).
clear_ets(Tid, Mref) ->
R = get_md_record_by_mref(Tid, Mref),
update_ets(Tid, R#md{ proxy_pid = undefined, mref = undefined }).
purge_ets(Tid, R) ->
ok = ets:delete_object(Tid, R).
get_md_record_by_mref(Tid, Mref) ->
[R] = ets:match_object(Tid, {md, '_', '_', Mref}),
R.

View file

@ -0,0 +1,328 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_file_proxy_eqc).
-ifdef(TEST).
-ifdef(EQC).
-compile(export_all).
-include("machi.hrl").
-include_lib("eqc/include/eqc.hrl").
-include_lib("eqc/include/eqc_statem.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(QC_OUT(P),
eqc:on_output(fun(Str, Args) -> io:format(user, Str, Args) end, P)).
%% EUNIT TEST DEFINITION
eqc_test_() ->
{timeout, 60,
{spawn,
[
{timeout, 30, ?_assertEqual(true, eqc:quickcheck(eqc:testing_time(15, ?QC_OUT(prop_ok()))))}
]
}}.
%% SHELL HELPERS
test() ->
test(100).
test(N) ->
quickcheck(numtests(N, prop_ok())).
check() ->
check(prop_ok(), current_counterexample()).
%% GENERATORS
csum_type() ->
elements([?CSUM_TAG_NONE, ?CSUM_TAG_CLIENT_SHA, ?CSUM_TAG_SERVER_SHA]).
csum(Type, Binary) ->
case Type of
?CSUM_TAG_NONE -> <<>>;
_ -> machi_util:checksum_chunk(Binary)
end.
position(P) ->
?LET(O, offset(), P + O).
offset() ->
?SUCHTHAT(X, int(), X >= 0).
offset_base() ->
elements([4096, 6144, 7168, 8192, 20480, 100000, 1000000]).
big_offset() ->
?LET(P, int(), ?LET(X, offset_base(), P+X)).
len() ->
?SUCHTHAT(X, int(), X >= 1).
data_with_csum() ->
?LET({B,T},{eqc_gen:largebinary(), csum_type()}, {B,T, csum(T, B)}).
%?LET({B,T},{eqc_gen:binary(), csum_type()}, {B,T, csum(T, B)}).
data_with_csum(Limit) ->
%?LET({B,T},{?LET(S, Limit, eqc_gen:largebinary(S)), csum_type()}, {B,T, csum(T, B)}).
?LET({B,T},{?LET(S, Limit, eqc_gen:binary(S)), csum_type()}, {B,T, csum(T, B)}).
intervals([]) ->
[];
intervals([N]) ->
[{N, choose(1,150)}];
intervals([A,B|T]) ->
[{A, choose(1, B-A)}|intervals([B|T])].
interval_list() ->
?LET(L, list(choose(1024, 4096)), intervals(lists:usort(L))).
shuffle_interval() ->
?LET(L, interval_list(), shuffle(L)).
get_written_interval(L) ->
?LET({O, Ln}, elements(L), {O+1, Ln-1}).
%% INITIALIZATION
-record(state, {pid, prev_extra = 0, planned_writes=[], written=[]}).
initial_state() -> #state{written=[{0,1024}]}.
initial_state(I) -> #state{written=[{0,1024}], planned_writes=I}.
weight(_S, rewrite) -> 1;
weight(_S, _) -> 2.
%% HELPERS
%% check if an operation is permitted based on whether a write has
%% occurred
check_writes(_Op, [], _Off, _L) ->
false;
check_writes(_Op, [{Pos, Sz}|_T], Off, L) when Pos == Off
andalso Sz == L ->
mostly_true;
check_writes(read, [{Pos, Sz}|_T], Off, L) when Off >= Pos
andalso Off < (Pos + Sz)
andalso Sz >= ( L - ( Off - Pos ) ) ->
true;
check_writes(write, [{Pos, Sz}|_T], Off, L) when ( Off + L ) > Pos
andalso Off < (Pos + Sz) ->
true;
check_writes(Op, [_H|T], Off, L) ->
check_writes(Op, T, Off, L).
is_error({error, _}) -> true;
is_error({error, _, _}) -> true;
is_error(Other) -> {expected_ERROR, Other}.
probably_error(ok) -> true;
probably_error(V) -> is_error(V).
is_ok({ok, _, _}) -> true;
is_ok(ok) -> true;
is_ok(Other) -> {expected_OK, Other}.
get_offset({ok, _Filename, Offset}) -> Offset;
get_offset(_) -> error(badarg).
offset_valid(Offset, Extra, L) ->
{Pos, Sz} = lists:last(L),
Offset == Pos + Sz + Extra.
-define(TESTDIR, "./eqc").
cleanup() ->
[begin
Fs = filelib:wildcard(?TESTDIR ++ Glob),
[file:delete(F) || F <- Fs],
[file:del_dir(F) || F <- Fs]
end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ],
_ = file:del_dir(?TESTDIR),
ok.
%% start
start_pre(S) ->
S#state.pid == undefined.
start_command(S) ->
{call, ?MODULE, start, [S]}.
start(_S) ->
{_, _, MS} = os:timestamp(),
File = test_server:temp_name("eqc_data") ++ "." ++ integer_to_list(MS),
{ok, Pid} = machi_file_proxy:start_link(File, ?TESTDIR),
unlink(Pid),
Pid.
start_next(S, Pid, _Args) ->
S#state{pid = Pid}.
%% read
read_pre(S) ->
S#state.pid /= undefined.
read_args(S) ->
[S#state.pid, offset(), len()].
read_ok(S, Off, L) ->
case S#state.written of
[{0, 1024}] -> false;
W -> check_writes(read, W, Off, L)
end.
read_post(S, [_Pid, Off, L], Res) ->
case read_ok(S, Off, L) of
true -> is_ok(Res);
mostly_true -> is_ok(Res);
false -> is_error(Res)
end.
read_next(S, _Res, _Args) -> S.
read(Pid, Offset, Length) ->
machi_file_proxy:read(Pid, Offset, Length).
%% write
write_pre(S) ->
S#state.pid /= undefined andalso S#state.planned_writes /= [].
%% do not allow writes with empty data
write_pre(_S, [_Pid, _Extra, {<<>>, _Tag, _Csum}]) ->
false;
write_pre(_S, _Args) ->
true.
write_args(S) ->
{Off, Len} = hd(S#state.planned_writes),
[S#state.pid, Off, data_with_csum(Len)].
write_ok(_S, [_Pid, Off, _Data]) when Off < 1024 -> false;
write_ok(S, [_Pid, Off, {Bin, _Tag, _Csum}]) ->
Size = iolist_size(Bin),
%% Check writes checks if a byte range is *written*
%% So writes are ok IFF they are NOT written, so
%% we want not check_writes/3 to be true.
check_writes(write, S#state.written, Off, Size).
write_post(S, Args, Res) ->
case write_ok(S, Args) of
%% false means this range has NOT been written before, so
%% it should succeed
false -> eq(Res, ok);
%% mostly true means we've written this range before BUT
%% as a special case if we get a call to write the EXACT
%% same data that's already on the disk, we return "ok"
%% instead of {error, written}.
mostly_true -> probably_error(Res);
%% If we get true, then we've already written this section
%% or a portion of this range to disk and should return an
%% error.
true -> is_error(Res)
end.
write_next(S, Res, [_Pid, Offset, {Bin, _Tag, _Csum}]) ->
S0 = case is_ok(Res) of
true ->
S#state{written = lists:sort(S#state.written ++ [{Offset, iolist_size(Bin)}]) };
_ ->
S
end,
S0#state{prev_extra = 0, planned_writes=tl(S0#state.planned_writes)}.
write(Pid, Offset, {Bin, Tag, Csum}) ->
Meta = [{client_csum_tag, Tag},
{client_csum, Csum}],
machi_file_proxy:write(Pid, Offset, Meta, Bin).
%% append
append_pre(S) ->
S#state.pid /= undefined.
%% do not allow appends with empty binary data
append_pre(_S, [_Pid, _Extra, {<<>>, _Tag, _Csum}]) ->
false;
append_pre(_S, _Args) ->
true.
append_args(S) ->
[S#state.pid, default(0, len()), data_with_csum()].
append(Pid, Extra, {Bin, Tag, Csum}) ->
Meta = [{client_csum_tag, Tag},
{client_csum, Csum}],
machi_file_proxy:append(Pid, Meta, Extra, Bin).
append_next(S, Res, [_Pid, Extra, {Bin, _Tag, _Csum}]) ->
case is_ok(Res) of
true ->
Offset = get_offset(Res),
true = offset_valid(Offset, S#state.prev_extra, S#state.written),
S#state{prev_extra = Extra, written = lists:sort(S#state.written ++ [{Offset, iolist_size(Bin)}])};
_ ->
S
end.
%% appends should always succeed unless the disk is full
%% or there's a hardware failure.
append_post(_S, _Args, Res) ->
true == is_ok(Res).
%% rewrite
rewrite_pre(S) ->
S#state.pid /= undefined andalso S#state.written /= [].
rewrite_args(S) ->
?LET({Off, Len}, get_written_interval(S#state.written),
[S#state.pid, Off, data_with_csum(Len)]).
rewrite(Pid, Offset, {Bin, Tag, Csum}) ->
Meta = [{client_csum_tag, Tag},
{client_csum, Csum}],
machi_file_proxy:write(Pid, Offset, Meta, Bin).
rewrite_post(_S, _Args, Res) ->
is_error(Res).
rewrite_next(S, _Res, _Args) ->
S#state{prev_extra = 0}.
%% Property
prop_ok() ->
cleanup(),
?FORALL(I, shuffle_interval(),
?FORALL(Cmds, parallel_commands(?MODULE, initial_state(I)),
begin
{H, S, Res} = run_parallel_commands(?MODULE, Cmds),
pretty_commands(?MODULE, Cmds, {H, S, Res},
aggregate(command_names(Cmds), Res == ok))
end)
).
-endif. % EQC
-endif. % TEST

View file

@ -0,0 +1,103 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_file_proxy_test).
-ifdef(TEST).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include("machi.hrl").
clean_up_data_dir(DataDir) ->
[begin
Fs = filelib:wildcard(DataDir ++ Glob),
[file:delete(F) || F <- Fs],
[file:del_dir(F) || F <- Fs]
end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ],
_ = file:del_dir(DataDir),
ok.
-ifndef(PULSE).
-define(TESTDIR, "./t").
-define(HYOOGE, 1 * 1024 * 1024 * 1024). % 1 long GB
random_binary_single() ->
%% OK, I guess it's not that random...
<<"Four score and seven years ago our fathers brought forth on this
continent a new nation, conceived in liberty, and dedicated to the
proposition that all men are created equal.
Now we are engaged in a great civil war, testing whether that nation, or any
nation so conceived and so dedicated, can long endure. We are met on a great
battlefield of that war. We have come to dedicate a portion of that field, as a
final resting place for those who here gave their lives that that nation
might live. It is altogether fitting and proper that we should do this.
But, in a larger sense, we can not dedicate, we can not consecrate, we can not
hallow this ground. The brave men, living and dead, who struggled here, have
consecrated it, far above our poor power to add or detract. The world will
little note, nor long remember what we say here, but it can never forget what
they did here. It is for us the living, rather, to be dedicated here to the
unfinished work which they who fought here have thus far so nobly advanced. It
is rather for us to be here dedicated to the great task remaining before us
that from these honored dead we take increased devotion to that cause for which
they gave the last full measure of devotion that we here highly resolve that
these dead shall not have died in vain that this nation, under God, shall have
a new birth of freedom and that government of the people, by the people, for
the people, shall not perish from the earth.">>.
random_binary(Start, End) ->
Size = byte_size(random_binary_single()) - 1,
case End > Size of
true ->
Copies = ( End div Size ) + 1,
D0 = binary:copy(random_binary_single(), Copies),
binary:part(<<D0/binary>>, Start, End);
false ->
binary:part(random_binary_single(), Start, End)
end.
machi_file_proxy_test_() ->
clean_up_data_dir(?TESTDIR),
{ok, Pid} = machi_file_proxy:start_link("test", ?TESTDIR),
[
?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)),
?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)),
?_assertEqual({error, bad_arg}, machi_file_proxy:append(Pid, [], -1, <<"krep">>)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, 1)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1024)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE))),
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)),
?_assertMatch({ok, _, _}, machi_file_proxy:read(Pid, 1025, 1000)),
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)),
?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))),
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))
].
-endif. % !PULSE
-endif. % TEST.