Enforce write-once property #1
2 changed files with 233 additions and 119 deletions
|
@ -23,7 +23,17 @@
|
||||||
%% conceit at the heart of Machi's design.
|
%% conceit at the heart of Machi's design.
|
||||||
%%
|
%%
|
||||||
%% Read, write and append requests for a single file will be managed
|
%% Read, write and append requests for a single file will be managed
|
||||||
%% through this proxy.
|
%% through this proxy. Clients can also request syncs for specific
|
||||||
|
%% types of filehandles.
|
||||||
|
%%
|
||||||
|
%% As operations are requested, the proxy keeps track of how many
|
||||||
|
%% operations it has performed (and how many errors were generated.)
|
||||||
|
%% After a sufficient number of inactivity, the server terminates
|
||||||
|
%% itself.
|
||||||
|
%%
|
||||||
|
%% TODO:
|
||||||
|
%% 1. Some way to transition the proxy into/out of a wedged state that
|
||||||
|
%% doesn't rely on message delivery.
|
||||||
|
|
||||||
-module(machi_file_proxy).
|
-module(machi_file_proxy).
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
@ -38,8 +48,8 @@
|
||||||
read/3,
|
read/3,
|
||||||
write/3,
|
write/3,
|
||||||
write/4,
|
write/4,
|
||||||
append/3,
|
append/2,
|
||||||
append/5
|
append/4
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
|
@ -55,6 +65,7 @@
|
||||||
-define(TICK, 5*1000).
|
-define(TICK, 5*1000).
|
||||||
-define(TICK_THRESHOLD, 5). %% After this + 1 more quiescent ticks, shutdown
|
-define(TICK_THRESHOLD, 5). %% After this + 1 more quiescent ticks, shutdown
|
||||||
-define(TIMEOUT, 10*1000).
|
-define(TIMEOUT, 10*1000).
|
||||||
|
-define(TOO_MANY_ERRORS_RATIO, 50).
|
||||||
|
|
||||||
-type op_stats() :: { Total :: non_neg_integer(), Errors :: non_neg_integer() }.
|
-type op_stats() :: { Total :: non_neg_integer(), Errors :: non_neg_integer() }.
|
||||||
|
|
||||||
|
@ -62,7 +73,7 @@
|
||||||
data_dir :: string() | undefined,
|
data_dir :: string() | undefined,
|
||||||
filename :: string() | undefined,
|
filename :: string() | undefined,
|
||||||
data_path :: string() | undefined,
|
data_path :: string() | undefined,
|
||||||
sealed = false :: true|false, %% XXX sealed means this file is closed to new writes; not sure if useful
|
wedged = false :: boolean(),
|
||||||
csum_file :: string()|undefined,
|
csum_file :: string()|undefined,
|
||||||
csum_path :: string()|undefined,
|
csum_path :: string()|undefined,
|
||||||
last_write_offset = 0 :: non_neg_integer(),
|
last_write_offset = 0 :: non_neg_integer(),
|
||||||
|
@ -79,27 +90,28 @@
|
||||||
%% Public API
|
%% Public API
|
||||||
|
|
||||||
start_link(Filename, DataDir) ->
|
start_link(Filename, DataDir) ->
|
||||||
gen_server:start_link({local, to_atom(Filename)}, ?MODULE, {Filename, DataDir}, []).
|
gen_server:start_link(?MODULE, {Filename, DataDir}, []).
|
||||||
|
|
||||||
% @doc Force a sync of all filehandles
|
% @doc Force a sync of all filehandles
|
||||||
-spec sync(Filename :: string()) -> ok|{error, term()}.
|
-spec sync(Pid :: pid()) -> ok|{error, term()}.
|
||||||
sync(Filename) ->
|
sync(Pid) ->
|
||||||
sync(Filename, all).
|
sync(Pid, all).
|
||||||
|
|
||||||
% @doc Force a sync of a specific filehandle type. Valid types are `all', `csum' and `data'.
|
% @doc Force a sync of a specific filehandle type. Valid types are `all', `csum' and `data'.
|
||||||
-spec sync(Filename :: string(), Type :: all|data|csum) -> ok|{error, term()}.
|
-spec sync(Pid :: pid(), Type :: all|data|csum) -> ok|{error, term()}.
|
||||||
sync(Filename, Type) ->
|
sync(Pid, Type) ->
|
||||||
gen_server:call(to_atom(Filename), {sync, Type}, ?TIMEOUT).
|
gen_server:call(Pid, {sync, Type}, ?TIMEOUT).
|
||||||
|
|
||||||
% @doc Read file at offset for length
|
% @doc Read file at offset for length
|
||||||
-spec read(Filename :: string(), Offset :: non_neg_integer(), Length :: non_neg_integer()) -> {ok, binary()}|{error, term()}.
|
-spec read(Pid :: pid(), Offset :: non_neg_integer(),
|
||||||
read(Filename, Offset, Length) ->
|
Length :: non_neg_integer()) -> {ok, Data :: binary(), Checksum :: binary()}|{error, term()}.
|
||||||
gen_server:call(to_atom(Filename), {read, Offset, Length}, ?TIMEOUT).
|
read(Pid, Offset, Length) ->
|
||||||
|
gen_server:call(Pid, {read, Offset, Length}, ?TIMEOUT).
|
||||||
|
|
||||||
% @doc Write data at offset
|
% @doc Write data at offset
|
||||||
-spec write(Filename :: string(), Offset :: non_neg_integer(), Data :: binary()) -> ok.
|
-spec write(Pid :: pid(), Offset :: non_neg_integer(), Data :: binary()) -> ok.
|
||||||
write(Filename, Offset, Data) ->
|
write(Pid, Offset, Data) ->
|
||||||
write(Filename, Offset, [], Data).
|
write(Pid, Offset, [], Data).
|
||||||
|
|
||||||
% @doc Write data at offset, including the client metadata. ClientMeta is a proplist
|
% @doc Write data at offset, including the client metadata. ClientMeta is a proplist
|
||||||
% that expects the following keys and values:
|
% that expects the following keys and values:
|
||||||
|
@ -107,22 +119,23 @@ write(Filename, Offset, Data) ->
|
||||||
% <li>`client_csum_tag' - the type of checksum from the client as defined in the machi.hrl file
|
% <li>`client_csum_tag' - the type of checksum from the client as defined in the machi.hrl file
|
||||||
% <li>`client_csum' - the checksum value from the client
|
% <li>`client_csum' - the checksum value from the client
|
||||||
% </ul>
|
% </ul>
|
||||||
-spec write(Filename :: string(), Offset :: non_neg_integer(), ClientMeta :: proplists:proplist(),
|
-spec write(Pid :: pid(), Offset :: non_neg_integer(), ClientMeta :: proplists:proplist(),
|
||||||
Data :: binary()) -> ok|{error, term()}.
|
Data :: binary()) -> ok|{error, term()}.
|
||||||
write(Filename, Offset, ClientMeta, Data) ->
|
write(Pid, Offset, ClientMeta, Data) ->
|
||||||
gen_server:call(to_atom(Filename), {write, Offset, ClientMeta, Data}, ?TIMEOUT).
|
gen_server:call(Pid, {write, Offset, ClientMeta, Data}, ?TIMEOUT).
|
||||||
|
|
||||||
% @doc Append data at offset
|
% @doc Append data
|
||||||
-spec append(Filename :: string(), Offset :: non_neg_integer(), Data :: binary()) -> ok|{error, term()}.
|
-spec append(Pid :: pid(), Data :: binary()) -> ok|{error, term()}.
|
||||||
append(Filename, Offset, Data) ->
|
append(Pid, Data) ->
|
||||||
append(Filename, Offset, [], 0, Data).
|
append(Pid, [], 0, Data).
|
||||||
|
|
||||||
% @doc Append data at offset, supplying client metadata and (if desired) a reservation for
|
% @doc Append data to file, supplying client metadata and (if desired) a
|
||||||
% additional space. ClientMeta is a proplist and expects the same keys as write/4.
|
% reservation for additional space. ClientMeta is a proplist and expects the
|
||||||
-spec append(Filename :: string(), Offset :: non_neg_integer(), ClientMeta :: proplists:proplist(),
|
% same keys as write/4.
|
||||||
|
-spec append(Pid :: pid(), ClientMeta :: proplists:proplist(),
|
||||||
Extra :: non_neg_integer(), Data :: binary()) -> ok|{error, term()}.
|
Extra :: non_neg_integer(), Data :: binary()) -> ok|{error, term()}.
|
||||||
append(Filename, Offset, ClientMeta, Extra, Data) ->
|
append(Pid, ClientMeta, Extra, Data) ->
|
||||||
gen_server:call(to_atom(Filename), {append, Offset, ClientMeta, Extra, Data}, ?TIMEOUT).
|
gen_server:call(Pid, {append, ClientMeta, Extra, Data}, ?TIMEOUT).
|
||||||
|
|
||||||
%% TODO
|
%% TODO
|
||||||
%% read_repair(Filename, Offset, Data) ???
|
%% read_repair(Filename, Offset, Data) ???
|
||||||
|
@ -135,9 +148,10 @@ append(Filename, Offset, ClientMeta, Extra, Data) ->
|
||||||
init({Filename, DataDir}) ->
|
init({Filename, DataDir}) ->
|
||||||
CsumFile = machi_util:make_csum_filename(DataDir, Filename),
|
CsumFile = machi_util:make_csum_filename(DataDir, Filename),
|
||||||
{_, DPath} = machi_util:make_data_filename(DataDir, Filename),
|
{_, DPath} = machi_util:make_data_filename(DataDir, Filename),
|
||||||
LastWriteOffset = get_last_offset_from_csum_file(CsumFile),
|
LastWriteOffset = case parse_csum_file(CsumFile) of
|
||||||
%% The paranoid might do a file info request to validate that the
|
0 -> ?MINIMUM_OFFSET;
|
||||||
%% calculated offset is the same as the on-disk file's length
|
V -> V
|
||||||
|
end,
|
||||||
{ok, FHd} = file:open(DPath, [read, write, binary, raw]),
|
{ok, FHd} = file:open(DPath, [read, write, binary, raw]),
|
||||||
{ok, FHc} = file:open(CsumFile, [append, binary, raw]),
|
{ok, FHc} = file:open(CsumFile, [append, binary, raw]),
|
||||||
Tref = schedule_tick(),
|
Tref = schedule_tick(),
|
||||||
|
@ -181,13 +195,11 @@ handle_call({sync, all}, _From, State = #state{filename = F,
|
||||||
|
|
||||||
%%% READS
|
%%% READS
|
||||||
|
|
||||||
handle_call({read, Offset, _Length}, _From,
|
handle_call({read, _Offset, _Length}, _From,
|
||||||
State = #state{last_write_offset = Last,
|
State = #state{wedged = true,
|
||||||
reads = {T, Err}
|
reads = {T, Err}
|
||||||
}) when Offset > Last ->
|
}) ->
|
||||||
lager:error("Read request at offset ~p is past the last write offset of ~p",
|
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};
|
||||||
[Offset, Last]),
|
|
||||||
{reply, {error, not_written}, State#state{reads = {T + 1, Err + 1}}};
|
|
||||||
|
|
||||||
handle_call({read, Offset, Length}, _From,
|
handle_call({read, Offset, Length}, _From,
|
||||||
State = #state{last_write_offset = Last,
|
State = #state{last_write_offset = Last,
|
||||||
|
@ -202,39 +214,26 @@ handle_call({read, Offset, Length}, _From,
|
||||||
data_filehandle = FH,
|
data_filehandle = FH,
|
||||||
reads = {T, Err}
|
reads = {T, Err}
|
||||||
}) ->
|
}) ->
|
||||||
{Resp, NewErr} = case file:pread(FH, Offset, Length) of
|
|
||||||
{ok, Bytes} when byte_size(Bytes) == Length ->
|
Checksum = get({Offset, Length}), %% N.B. Maybe be 'undefined'!
|
||||||
lager:debug("successful read at ~p of ~p bytes", [Offset, Length]),
|
|
||||||
{{ok, Bytes}, Err};
|
{Resp, NewErr} = case do_read(FH, F, Checksum, Offset, Length) of
|
||||||
{ok, Partial} ->
|
{ok, Bytes, Csum} ->
|
||||||
lager:error("read ~p bytes, wanted ~p at offset ~p in file ~p",
|
{{ok, Bytes, Csum}, Err};
|
||||||
[byte_size(Partial), Length, Offset, F]),
|
|
||||||
{{error, partial_read}, Err + 1};
|
|
||||||
eof ->
|
eof ->
|
||||||
lager:debug("Got eof on read operation", []),
|
|
||||||
{{error, not_written}, Err + 1};
|
{{error, not_written}, Err + 1};
|
||||||
Other ->
|
Error ->
|
||||||
lager:warning("Got ~p during file read operation on ~p", [Other, F]),
|
{Error, Err + 1}
|
||||||
{{error, Other}, Err + 1}
|
|
||||||
end,
|
end,
|
||||||
{reply, Resp, State#state{reads = {T+1, NewErr}}};
|
{reply, Resp, State#state{reads = {T+1, NewErr}}};
|
||||||
|
|
||||||
%%% WRITES
|
%%% WRITES
|
||||||
|
|
||||||
handle_call({write, _Offset, _ClientMeta, _Data}, _From,
|
handle_call({write, _Offset, _ClientMeta, _Data}, _From,
|
||||||
State = #state{sealed = true,
|
State = #state{wedged = true,
|
||||||
writes = {T, Err}
|
writes = {T, Err}
|
||||||
}) ->
|
}) ->
|
||||||
{reply, {error, sealed}, State#state{writes = {T + 1, Err + 1}}};
|
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};
|
||||||
|
|
||||||
handle_call({write, Offset, _ClientMeta, _Data}, _From,
|
|
||||||
State = #state{last_write_offset = Last,
|
|
||||||
writes = {T, Err}
|
|
||||||
}) when Offset =< Last ->
|
|
||||||
{reply, {error, written}, State#state{writes = {T + 1, Err + 1}}};
|
|
||||||
|
|
||||||
%% XXX: What if the chunk is larger than the max file size??
|
|
||||||
%% XXX: What if the chunk is larger than the physical disk we have??
|
|
||||||
|
|
||||||
handle_call({write, Offset, ClientMeta, Data}, _From,
|
handle_call({write, Offset, ClientMeta, Data}, _From,
|
||||||
State = #state{last_write_offset = Last,
|
State = #state{last_write_offset = Last,
|
||||||
|
@ -242,86 +241,84 @@ handle_call({write, Offset, ClientMeta, Data}, _From,
|
||||||
writes = {T, Err},
|
writes = {T, Err},
|
||||||
data_filehandle = FHd,
|
data_filehandle = FHd,
|
||||||
csum_filehandle = FHc
|
csum_filehandle = FHc
|
||||||
}) when Offset > Last ->
|
}) ->
|
||||||
|
|
||||||
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta), %% gets 'undefined' if not found
|
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE),
|
||||||
ClientCsum = proplists:get_value(client_csum, ClientMeta), %% also potentially 'undefined'
|
ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>),
|
||||||
Size = iolist_size(Data),
|
|
||||||
|
|
||||||
{Resp, NewErr, NewLast} =
|
{Resp, NewErr, NewLast} =
|
||||||
case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of
|
case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of
|
||||||
{error, Error} ->
|
{error, {bad_csum, Bad}} ->
|
||||||
{{error, Error}, Err + 1, Last};
|
lager:error("Bad checksum on write; client sent ~p, we computed ~p",
|
||||||
|
[ClientCsum, Bad]),
|
||||||
|
{{error, bad_csum}, Err + 1, Last};
|
||||||
TaggedCsum ->
|
TaggedCsum ->
|
||||||
%% Is additional paranoia warranted here? Should we attempt a pread
|
case handle_write(FHd, FHc, F, TaggedCsum, Offset, Data) of
|
||||||
%% at this position
|
|
||||||
case file:pwrite(FHd, Offset, Data) of
|
|
||||||
ok ->
|
ok ->
|
||||||
EncodedCsum = encode_csum_file_entry(Offset, Size, TaggedCsum),
|
{ok, Err, Last + Offset};
|
||||||
ok = file:write(FHc, EncodedCsum),
|
Error ->
|
||||||
{ok, Err, Last + Size};
|
{Error, Err + 1, Last}
|
||||||
Other ->
|
|
||||||
lager:error("Got ~p during write on file ~p at offset ~p, length ~p",
|
|
||||||
[Other, F, Offset, Size]),
|
|
||||||
{Other, Err + 1, Last} %% How do we detect partial writes? Pretend they don't exist? :)
|
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
{reply, Resp, State#state{writes = {T+1, NewErr}, last_write_offset = NewLast}};
|
{reply, Resp, State#state{writes = {T+1, NewErr}, last_write_offset = NewLast}};
|
||||||
|
|
||||||
%% APPENDS
|
%% APPENDS
|
||||||
|
|
||||||
handle_call({append, _Offset, _ClientMeta, _Extra, _Data}, _From,
|
handle_call({append, _ClientMeta, _Extra, _Data}, _From,
|
||||||
State = #state{sealed = true,
|
State = #state{wedged = true,
|
||||||
appends = {T, Err}
|
appends = {T, Err}
|
||||||
}) ->
|
}) ->
|
||||||
{reply, {error, sealed}, State#state{appends = {T+1, Err+1}}};
|
{reply, {error, wedged}, State#state{appends = {T+1, Err+1}}};
|
||||||
|
|
||||||
handle_call({append, Offset, _ClientMeta, _Extra, _Data}, _From,
|
handle_call({append, ClientMeta, Extra, Data}, _From,
|
||||||
State = #state{last_write_offset = Last,
|
|
||||||
appends = {T, Err}
|
|
||||||
}) when Offset =< Last ->
|
|
||||||
{reply, {error, written}, State#state{appends = {T+1, Err+1}}};
|
|
||||||
|
|
||||||
handle_call({append, Offset, ClientMeta, Extra, Data}, _From,
|
|
||||||
State = #state{last_write_offset = Last,
|
State = #state{last_write_offset = Last,
|
||||||
filename = F,
|
filename = F,
|
||||||
appends = {T, Err},
|
appends = {T, Err},
|
||||||
data_filehandle = FHd,
|
data_filehandle = FHd,
|
||||||
csum_filehandle = FHc
|
csum_filehandle = FHc
|
||||||
}) when Offset > Last ->
|
}) ->
|
||||||
|
|
||||||
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta), %% gets 'undefined' if not found
|
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE),
|
||||||
ClientCsum = proplists:get_value(client_csum, ClientMeta), %% also potentially 'undefined'
|
ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>),
|
||||||
Size = iolist_size(Data),
|
Size = iolist_size(Data),
|
||||||
|
|
||||||
{Resp, NewErr, NewLast} =
|
{Resp, NewErr, NewLast} =
|
||||||
case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of
|
case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of
|
||||||
{error, Error} ->
|
{error, {bad_csum, Bad}} ->
|
||||||
{{error, Error}, Err + 1, Last};
|
lager:error("Bad checksum; client sent ~p, we computed ~p",
|
||||||
|
[ClientCsum, Bad]),
|
||||||
|
{{error, bad_csum}, Err + 1, Last};
|
||||||
TaggedCsum ->
|
TaggedCsum ->
|
||||||
%% Is additional paranoia warranted here?
|
case handle_write(FHd, FHc, F, TaggedCsum, Last, Data) of
|
||||||
%% Should we attempt a pread at offset?
|
|
||||||
case file:pwrite(FHd, Offset, Data) of
|
|
||||||
ok ->
|
ok ->
|
||||||
EncodedCsum = encode_csum_file_entry(Offset, Size, TaggedCsum),
|
{{ok, F, Last}, Err, Last + Size + Extra};
|
||||||
ok = file:write(FHc, EncodedCsum),
|
Error ->
|
||||||
{ok, Err, Last + Size + Extra};
|
{Error, Err + 1, Last}
|
||||||
Other ->
|
|
||||||
lager:error("Got ~p during append on file ~p at offset ~p, length ~p",
|
|
||||||
[Other, F, Offset, Size]),
|
|
||||||
{Other, Err + 1, Last} %% How do we detect partial writes? Pretend they don't exist? :)
|
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
{reply, Resp, State#state{appends = {T+1, NewErr}, last_write_offset = NewLast}};
|
{reply, Resp, State#state{appends = {T+1, NewErr}, last_write_offset = NewLast}};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
lager:warning("Unknown call: ~p", [Req]),
|
lager:warning("Unknown call: ~p", [Req]),
|
||||||
{reply, whaaaaaaaaaa, State}.
|
{reply, whoaaaaaaaaaaaa, State}.
|
||||||
|
|
||||||
handle_cast(Cast, State) ->
|
handle_cast(Cast, State) ->
|
||||||
lager:warning("Unknown cast: ~p", [Cast]),
|
lager:warning("Unknown cast: ~p", [Cast]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
%% I dunno. This may not be a good idea, but it seems like if we're throwing lots of
|
||||||
|
%% errors, we ought to shut down and give up our file descriptors.
|
||||||
|
handle_info(tick, State = #state{
|
||||||
|
ops = Ops,
|
||||||
|
reads = {RT, RE},
|
||||||
|
writes = {WT, WE},
|
||||||
|
appends = {AT, AE}
|
||||||
|
}) when Ops > 100 andalso
|
||||||
|
trunc(((RE+WE+AE) / RT+WT+AT) * 100) > ?TOO_MANY_ERRORS_RATIO ->
|
||||||
|
Errors = RE + WE + AE,
|
||||||
|
lager:notice("Got ~p errors. Shutting down.", [Errors]),
|
||||||
|
{stop, too_many_errors, State};
|
||||||
|
|
||||||
handle_info(tick, State = #state{
|
handle_info(tick, State = #state{
|
||||||
ticks = Ticks,
|
ticks = Ticks,
|
||||||
ops = Ops,
|
ops = Ops,
|
||||||
|
@ -351,6 +348,26 @@ handle_info(tick, State = #state{
|
||||||
Tref = schedule_tick(),
|
Tref = schedule_tick(),
|
||||||
{noreply, State#state{tref = Tref, ops = Ops}};
|
{noreply, State#state{tref = Tref, ops = Ops}};
|
||||||
|
|
||||||
|
%handle_info({wedged, EpochId} State = #state{epoch = E}) when E /= EpochId ->
|
||||||
|
% lager:notice("Wedge epoch ~p but ignoring because our epoch id is ~p", [EpochId, E]),
|
||||||
|
% {noreply, State};
|
||||||
|
|
||||||
|
%handle_info({wedged, EpochId}, State = #state{epoch = E}) when E == EpochId ->
|
||||||
|
% lager:notice("Wedge epoch ~p same as our epoch id ~p; we are wedged. Bummer.", [EpochId, E]),
|
||||||
|
% {noreply, State#state{wedged = true}};
|
||||||
|
|
||||||
|
% flu1.erl:
|
||||||
|
% ProxyPid = get_proxy_pid(Filename),
|
||||||
|
% Are we wedged? if not
|
||||||
|
% machi_file_proxy:read(Pid, Offset, Length)
|
||||||
|
% otherwise -> error,wedged
|
||||||
|
%
|
||||||
|
% get_proxy_pid(Filename) ->
|
||||||
|
% Pid = lookup_pid(Filename)
|
||||||
|
% is_pid_alive(Pid)
|
||||||
|
% Pid
|
||||||
|
% if not alive then start one
|
||||||
|
|
||||||
handle_info(Req, State) ->
|
handle_info(Req, State) ->
|
||||||
lager:warning("Unknown info message: ~p", [Req]),
|
lager:warning("Unknown info message: ~p", [Req]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
@ -377,22 +394,26 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
|
|
||||||
%% Private functions
|
%% Private functions
|
||||||
|
|
||||||
|
-spec schedule_tick() -> reference().
|
||||||
schedule_tick() ->
|
schedule_tick() ->
|
||||||
erlang:send_after(?TICK, self(), tick).
|
erlang:send_after(?TICK, self(), tick).
|
||||||
|
|
||||||
check_or_make_tagged_csum(undefined, undefined, Data) ->
|
-spec check_or_make_tagged_csum(Type :: binary(),
|
||||||
check_or_make_tagged_csum(?CSUM_TAG_NONE, undefined, Data);
|
Checksum :: binary(),
|
||||||
|
Data :: binary() ) -> binary() |
|
||||||
|
{error, {bad_csum, Bad :: binary()}}.
|
||||||
check_or_make_tagged_csum(?CSUM_TAG_NONE, _Csum, Data) ->
|
check_or_make_tagged_csum(?CSUM_TAG_NONE, _Csum, Data) ->
|
||||||
%% We are making a checksum here
|
%% We are making a checksum here
|
||||||
Csum = machi_util:checksum_chunk(Data),
|
Csum = machi_util:checksum_chunk(Data),
|
||||||
machi_util:make_tagged_csum(server_sha, Csum);
|
machi_util:make_tagged_csum(server_sha, Csum);
|
||||||
check_or_make_tagged_csum(?CSUM_TAG_CLIENT_SHA, ClientCsum, Data) ->
|
check_or_make_tagged_csum(Tag, InCsum, Data) when Tag == ?CSUM_TAG_CLIENT_SHA;
|
||||||
|
Tag == ?CSUM_TAG_SERVER_SHA ->
|
||||||
Csum = machi_util:checksum_chunk(Data),
|
Csum = machi_util:checksum_chunk(Data),
|
||||||
case Csum =:= ClientCsum of
|
case Csum =:= InCsum of
|
||||||
true ->
|
true ->
|
||||||
machi_util:make_tagged_csum(server_sha, Csum);
|
machi_util:make_tagged_csum(server_sha, Csum);
|
||||||
false ->
|
false ->
|
||||||
{error, bad_csum}
|
{error, {bad_csum, Csum}}
|
||||||
end;
|
end;
|
||||||
check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
|
check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
|
||||||
lager:warning("Unknown checksum tag ~p", [OtherTag]),
|
lager:warning("Unknown checksum tag ~p", [OtherTag]),
|
||||||
|
@ -403,17 +424,110 @@ encode_csum_file_entry(Offset, Size, TaggedCSum) ->
|
||||||
[<<Len:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big>>,
|
[<<Len:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big>>,
|
||||||
TaggedCSum].
|
TaggedCSum].
|
||||||
|
|
||||||
get_last_offset_from_csum_file(Filename) ->
|
map_offsets_to_csums(CsumList) ->
|
||||||
|
lists:foreach(fun insert_offsets/1, CsumList).
|
||||||
|
|
||||||
|
insert_offsets({Offset, Length, Checksum}) ->
|
||||||
|
put({Offset, Length}, Checksum).
|
||||||
|
|
||||||
|
parse_csum_file(Filename) ->
|
||||||
{ok, CsumData} = file:read_file(Filename),
|
{ok, CsumData} = file:read_file(Filename),
|
||||||
{DecodedCsums, _Junk} = machi_flu1:split_checksum_list_blob_decode(CsumData),
|
{DecodedCsums, _Junk} = machi_flu1:split_checksum_list_blob_decode(CsumData),
|
||||||
case DecodedCsums of
|
case DecodedCsums of
|
||||||
[] -> 0;
|
[] -> 0;
|
||||||
_ ->
|
_ ->
|
||||||
|
map_offsets_to_csums(DecodedCsums),
|
||||||
{Offset, Size, _Csum} = lists:last(DecodedCsums),
|
{Offset, Size, _Csum} = lists:last(DecodedCsums),
|
||||||
Offset + Size
|
Offset + Size
|
||||||
end.
|
end.
|
||||||
|
|
||||||
to_atom(String) when is_list(String) ->
|
-spec do_read(FHd :: file:filehandle(),
|
||||||
%% XXX FIXME: leaks atoms, yo.
|
Filename :: string(),
|
||||||
list_to_atom(String).
|
TaggedCsum :: undefined|binary(),
|
||||||
|
Offset :: non_neg_integer(),
|
||||||
|
Size :: non_neg_integer()) -> eof |
|
||||||
|
{ok, Bytes :: binary(), Csum :: binary()} |
|
||||||
|
{error, bad_csum} |
|
||||||
|
{error, partial_read} |
|
||||||
|
{error, Other :: term() }.
|
||||||
|
do_read(FHd, Filename, undefined, Offset, Size) ->
|
||||||
|
do_read(FHd, Filename, machi_util:make_tagged_csum(none), Offset, Size);
|
||||||
|
|
||||||
|
do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
|
||||||
|
case file:pread(FHd, Offset, Size) of
|
||||||
|
eof ->
|
||||||
|
eof;
|
||||||
|
|
||||||
|
{ok, Bytes} when byte_size(Bytes) == Size ->
|
||||||
|
{Type, Ck} = machi_util:unmake_tagged_csum(TaggedCsum),
|
||||||
|
case check_or_make_tagged_csum(Type, Ck, Bytes) of
|
||||||
|
{error, Bad} ->
|
||||||
|
lager:error("Bad checksum; got ~p, expected ~p",
|
||||||
|
[Bad, Ck]),
|
||||||
|
{error, bad_csum};
|
||||||
|
TaggedCsum ->
|
||||||
|
{ok, Bytes, TaggedCsum}
|
||||||
|
end;
|
||||||
|
|
||||||
|
{ok, Partial} ->
|
||||||
|
lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p",
|
||||||
|
[Filename, Offset, Size, byte_size(Partial)]),
|
||||||
|
{error, partial_read};
|
||||||
|
|
||||||
|
Other ->
|
||||||
|
lager:error("While reading file ~p, offset ~p, length ~p, got ~p",
|
||||||
|
[Filename, Offset, Size, Other]),
|
||||||
|
{error, Other}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec handle_write( FHd :: file:filehandle(),
|
||||||
|
FHc :: file:filehandle(),
|
||||||
|
Filename :: string(),
|
||||||
|
TaggedCsum :: binary(),
|
||||||
|
Offset :: non_neg_integer(),
|
||||||
|
Data :: binary() ) -> ok |
|
||||||
|
{error, written} |
|
||||||
|
{error, Reason :: term()}.
|
||||||
|
handle_write(FHd, FHc, Filename, TaggedCsum, Offset, Data) ->
|
||||||
|
Size = iolist_size(Data),
|
||||||
|
case do_read(FHd, Filename, TaggedCsum, Offset, Size) of
|
||||||
|
eof ->
|
||||||
|
try
|
||||||
|
do_write(FHd, FHc, Filename, TaggedCsum, Offset, Size, Data)
|
||||||
|
catch
|
||||||
|
%%% XXX FIXME: be more specific on badmatch that might
|
||||||
|
%%% occur around line 520 when we write the checksum
|
||||||
|
%%% file entry for the data blob we just put on the disk
|
||||||
|
error:Reason ->
|
||||||
|
{error, Reason}
|
||||||
|
end;
|
||||||
|
{ok, _, _} ->
|
||||||
|
% yep, we did that write! Honest.
|
||||||
|
ok;
|
||||||
|
{error, Error} ->
|
||||||
|
lager:error("During write to ~p, offset ~p, got error ~p; returning {error, written}",
|
||||||
|
[Filename, Offset, Error]),
|
||||||
|
{error, written}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec do_write( FHd :: file:descriptor(),
|
||||||
|
FHc :: file:descriptor(),
|
||||||
|
Filename :: string(),
|
||||||
|
TaggedCsum :: binary(),
|
||||||
|
Offset :: non_neg_integer(),
|
||||||
|
Size :: non_neg_integer(),
|
||||||
|
Data :: binary() ) -> ok|term().
|
||||||
|
do_write(FHd, FHc, Filename, TaggedCsum, Offset, Size, Data) ->
|
||||||
|
case file:pwrite(FHd, Offset, Data) of
|
||||||
|
ok ->
|
||||||
|
lager:debug("Successful write in file ~p at offset ~p, length ~p",
|
||||||
|
[Filename, Offset, Size]),
|
||||||
|
EncodedCsum = encode_csum_file_entry(Offset, Size, TaggedCsum),
|
||||||
|
ok = file:write(FHc, EncodedCsum),
|
||||||
|
lager:debug("Successful write to checksum file for ~p.", [Filename]),
|
||||||
|
ok;
|
||||||
|
Other ->
|
||||||
|
lager:error("Got ~p during write to file ~p at offset ~p, length ~p",
|
||||||
|
[Other, Filename, Offset, Size]),
|
||||||
|
{error, Other}
|
||||||
|
end.
|
||||||
|
|
|
@ -37,7 +37,7 @@ start_link() ->
|
||||||
supervisor:start_link(?MODULE, []).
|
supervisor:start_link(?MODULE, []).
|
||||||
|
|
||||||
start_proxy(Filename, DataDir) ->
|
start_proxy(Filename, DataDir) ->
|
||||||
supervisor:start_child([{Filename, DataDir}]).
|
supervisor:start_child([Filename, DataDir]).
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
SupFlags = {simple_one_for_one, 1000, 10},
|
SupFlags = {simple_one_for_one, 1000, 10},
|
||||||
|
|
Loading…
Reference in a new issue