diff --git a/src/machi_file_proxy.erl b/src/machi_file_proxy.erl
index 0fe9732..88c83f8 100644
--- a/src/machi_file_proxy.erl
+++ b/src/machi_file_proxy.erl
@@ -34,6 +34,9 @@
%% TODO:
%% 1. Some way to transition the proxy into/out of a wedged state that
%% doesn't rely on message delivery.
+%%
+%% 2. We might need a read repair command which does an unconditional write
+%% on the data block?
-module(machi_file_proxy).
-behaviour(gen_server).
@@ -67,9 +70,11 @@
-define(TIMEOUT, 10*1000).
-define(TOO_MANY_ERRORS_RATIO, 50).
--type op_stats() :: { Total :: non_neg_integer(), Errors :: non_neg_integer() }.
+-type op_stats() :: { Total :: non_neg_integer(),
+ Errors :: non_neg_integer() }.
+
-type byte_sequence() :: { Offset :: non_neg_integer(),
- Size :: pos_integer()|infinity }.
+ Size :: pos_integer()|infinity }.
-record(state, {
data_dir :: string() | undefined,
@@ -92,6 +97,10 @@
%% Public API
+% @doc Start a new instance of the file proxy service. Takes the filename
+% and data directory as arguments. This function is typically called by the
+% `machi_file_proxy_sup:start_proxy/2' function.
+-spec start_link(Filename :: string(), DataDir :: string()) -> any().
start_link(Filename, DataDir) ->
gen_server:start_link(?MODULE, {Filename, DataDir}, []).
@@ -121,8 +130,8 @@ write(Pid, Offset, Data) ->
% @doc Write data at offset, including the client metadata. ClientMeta is a proplist
% that expects the following keys and values:
%
-% - `client_csum_tag' - the type of checksum from the client as defined in the machi.hrl file
-%
- `client_csum' - the checksum value from the client
+%
- `client_csum_tag' - the type of checksum from the client as defined in the machi.hrl file
+% - `client_csum' - the checksum value from the client
%
-spec write(Pid :: pid(), Offset :: non_neg_integer(), ClientMeta :: proplists:proplist(),
Data :: binary()) -> ok|{error, term()}.
@@ -164,6 +173,7 @@ init({Filename, DataDir}) ->
unwritten_bytes = UnwrittenBytes,
eof_position = Eof}}.
+% @private
handle_call({sync, data}, _From, State = #state{ data_filehandle = FHd }) ->
R = file:sync(FHd),
{reply, R, State};
@@ -310,10 +320,21 @@ handle_call(Req, _From, State) ->
lager:warning("Unknown call: ~p", [Req]),
{reply, whoaaaaaaaaaaaa, State}.
+% @private
handle_cast(Cast, State) ->
lager:warning("Unknown cast: ~p", [Cast]),
{noreply, State}.
+% @private
+handle_info(tick, State = #state{eof_position = Eof}) when Eof >= ?MAX_FILE_SIZE ->
+ lager:notice("Eof position ~p >= max file size ~p. Shutting down.",
+ [Eof, ?MAX_FILE_SIZE]),
+ {stop, file_rollover, State};
+
+%% XXX Is this a good idea? Need to think this through a bit.
+handle_info(tick, State = #state{wedged = true}) ->
+ {stop, wedged, State};
+
%% I dunno. This may not be a good idea, but it seems like if we're throwing lots of
%% errors, we ought to shut down and give up our file descriptors.
handle_info(tick, State = #state{
@@ -380,8 +401,8 @@ handle_info(Req, State) ->
lager:warning("Unknown info message: ~p", [Req]),
{noreply, State}.
-terminate(Reason, #state{
- filename = F,
+% @private
+terminate(Reason, #state{filename = F,
data_filehandle = FHd,
csum_filehandle = FHc,
reads = {RT, RE},
@@ -397,6 +418,7 @@ terminate(Reason, #state{
ok = file:close(FHc),
ok.
+% @private
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -524,6 +546,17 @@ do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
) -> {ok, NewU :: [byte_sequence()]} |
{error, written} |
{error, Reason :: term()}.
+% @private Implements the write and append operation. The first task is to
+% determine if the offset and data size has been written. If not, the write
+% is allowed proceed. A special case is made when an offset and data size
+% match a checksum. In that case we read the data off the disk, validate the
+% checksum and return a "fake" ok response as if the write had been performed
+% when it hasn't really.
+%
+% If a write proceeds, the offset, size and checksum are written to a metadata
+% file, and the internal list of unwritten bytes is modified to reflect the
+% just-performed write. This is then returned to the caller as
+% `{ok, NewUnwritten}' where NewUnwritten is the revised unwritten byte list.
handle_write(FHd, FHc, Filename, TaggedCsum, Offset, Data, U) ->
Size = iolist_size(Data),
@@ -542,20 +575,27 @@ handle_write(FHd, FHc, Filename, TaggedCsum, Offset, Data, U) ->
{ok, U};
_ ->
{error, written}
- end
+ end;
+ OtherCsum ->
+ %% Got a checksum, but it doesn't match the data block's
+ lager:error("During a potential write at offset ~p in file ~p, a check for unwritten bytes gave us checksum ~p but the data we were trying to trying to write has checksum ~p",
+ [Offset, Filename, OtherCsum, TaggedCsum]),
+ {error, written}
end;
true ->
try
do_write(FHd, FHc, Filename, TaggedCsum, Offset, Size, Data, U)
catch
%%% XXX FIXME: be more specific on badmatch that might
- %%% occur around line 533 when we write the checksum
+ %%% occur around line 593 when we write the checksum
%%% file entry for the data blob we just put on the disk
error:Reason ->
{error, Reason}
end
end.
+% @private Implements the disk writes for both the write and append
+% operation.
-spec do_write( FHd :: file:descriptor(),
FHc :: file:descriptor(),
Filename :: string(),
@@ -614,15 +654,16 @@ is_byte_range_unwritten(Offset, Size, Unwritten) ->
% {Position, Space}} is returned. `not_found' is returned if no suitable
% space is located.
lookup_unwritten(_Offset, _Size, []) ->
- not_found;
+ not_found;
lookup_unwritten(Offset, _Size, [H={Pos, infinity}|_Rest]) when Offset >= Pos ->
- {ok, H};
+ {ok, H};
lookup_unwritten(Offset, Size, [H={Pos, Space}|_Rest])
when Offset >= Pos andalso Offset < Pos+Space
andalso Size =< (Space - (Offset - Pos)) ->
- {ok, H};
+ {ok, H};
lookup_unwritten(Offset, Size, [_H|Rest]) ->
- lookup_unwritten(Offset, Size, Rest).
+ %% These are not the droids you're looking for.
+ lookup_unwritten(Offset, Size, Rest).
-spec update_unwritten( Offset :: non_neg_integer(),
Size :: pos_integer(),
@@ -645,7 +686,7 @@ update_unwritten(Offset, Size, Unwritten) ->
-spec update_byte_range( Offset :: non_neg_integer(),
Size :: pos_integer(),
Sequence :: byte_sequence() ) -> Updates :: [byte_sequence()].
-% @private Given an offset and size and a byte sequence tuple where a
+% @private Given an offset and size and a byte sequence tuple where a
% write took place, return a list of updates to the list of unwritten bytes
% accounting for the space occupied by the just completed write.
update_byte_range(Offset, Size, {Eof, infinity}) when Offset == Eof ->