Now with 100% more written byte tracking
This commit is contained in:
parent
2bcc7d0680
commit
06b4890a63
1 changed files with 259 additions and 113 deletions
|
@ -68,23 +68,26 @@
|
||||||
-define(TOO_MANY_ERRORS_RATIO, 50).
|
-define(TOO_MANY_ERRORS_RATIO, 50).
|
||||||
|
|
||||||
-type op_stats() :: { Total :: non_neg_integer(), Errors :: non_neg_integer() }.
|
-type op_stats() :: { Total :: non_neg_integer(), Errors :: non_neg_integer() }.
|
||||||
|
-type byte_sequence() :: { Offset :: non_neg_integer(),
|
||||||
|
Size :: pos_integer()|infinity }.
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
data_dir :: string() | undefined,
|
data_dir :: string() | undefined,
|
||||||
filename :: string() | undefined,
|
filename :: string() | undefined,
|
||||||
data_path :: string() | undefined,
|
data_path :: string() | undefined,
|
||||||
wedged = false :: boolean(),
|
wedged = false :: boolean(),
|
||||||
csum_file :: string()|undefined,
|
csum_file :: string()|undefined,
|
||||||
csum_path :: string()|undefined,
|
csum_path :: string()|undefined,
|
||||||
last_write_offset = 0 :: non_neg_integer(),
|
eof_position = 0 :: non_neg_integer(),
|
||||||
data_filehandle :: file:filehandle(),
|
unwritten_bytes = [] :: [byte_sequence()],
|
||||||
csum_filehandle :: file:filehandle(),
|
data_filehandle :: file:filehandle(),
|
||||||
tref :: reference(), %% timer ref
|
csum_filehandle :: file:filehandle(),
|
||||||
ticks = 0 :: non_neg_integer(), %% ticks elapsed with no new operations
|
tref :: reference(), %% timer ref
|
||||||
ops = 0 :: non_neg_integer(), %% sum of all ops
|
ticks = 0 :: non_neg_integer(), %% ticks elapsed with no new operations
|
||||||
reads = {0, 0} :: op_stats(),
|
ops = 0 :: non_neg_integer(), %% sum of all ops
|
||||||
writes = {0, 0} :: op_stats(),
|
reads = {0, 0} :: op_stats(),
|
||||||
appends = {0, 0} :: op_stats()
|
writes = {0, 0} :: op_stats(),
|
||||||
|
appends = {0, 0} :: op_stats()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%% Public API
|
%% Public API
|
||||||
|
@ -103,23 +106,25 @@ sync(Pid, Type) ->
|
||||||
gen_server:call(Pid, {sync, Type}, ?TIMEOUT).
|
gen_server:call(Pid, {sync, Type}, ?TIMEOUT).
|
||||||
|
|
||||||
% @doc Read file at offset for length
|
% @doc Read file at offset for length
|
||||||
-spec read(Pid :: pid(), Offset :: non_neg_integer(),
|
-spec read(Pid :: pid(),
|
||||||
Length :: non_neg_integer()) -> {ok, Data :: binary(), Checksum :: binary()}|{error, term()}.
|
Offset :: non_neg_integer(),
|
||||||
|
Length :: non_neg_integer()) -> {ok, Data :: binary(), Checksum :: binary()} |
|
||||||
|
{error, Reason :: term()}.
|
||||||
read(Pid, Offset, Length) ->
|
read(Pid, Offset, Length) ->
|
||||||
gen_server:call(Pid, {read, Offset, Length}, ?TIMEOUT).
|
gen_server:call(Pid, {read, Offset, Length}, ?TIMEOUT).
|
||||||
|
|
||||||
% @doc Write data at offset
|
% @doc Write data at offset
|
||||||
-spec write(Pid :: pid(), Offset :: non_neg_integer(), Data :: binary()) -> ok.
|
-spec write(Pid :: pid(), Offset :: non_neg_integer(), Data :: binary()) -> ok|{error, term()}.
|
||||||
write(Pid, Offset, Data) ->
|
write(Pid, Offset, Data) ->
|
||||||
write(Pid, Offset, [], Data).
|
write(Pid, Offset, [], Data).
|
||||||
|
|
||||||
% @doc Write data at offset, including the client metadata. ClientMeta is a proplist
|
% @doc Write data at offset, including the client metadata. ClientMeta is a proplist
|
||||||
% that expects the following keys and values:
|
% that expects the following keys and values:
|
||||||
% <ul>
|
% <ul>
|
||||||
% <li>`client_csum_tag' - the type of checksum from the client as defined in the machi.hrl file
|
% <li>`client_csum_tag' - the type of checksum from the client as defined in the machi.hrl file
|
||||||
% <li>`client_csum' - the checksum value from the client
|
% <li>`client_csum' - the checksum value from the client
|
||||||
% </ul>
|
% </ul>
|
||||||
-spec write(Pid :: pid(), Offset :: non_neg_integer(), ClientMeta :: proplists:proplist(),
|
-spec write(Pid :: pid(), Offset :: non_neg_integer(), ClientMeta :: proplists:proplist(),
|
||||||
Data :: binary()) -> ok|{error, term()}.
|
Data :: binary()) -> ok|{error, term()}.
|
||||||
write(Pid, Offset, ClientMeta, Data) ->
|
write(Pid, Offset, ClientMeta, Data) ->
|
||||||
gen_server:call(Pid, {write, Offset, ClientMeta, Data}, ?TIMEOUT).
|
gen_server:call(Pid, {write, Offset, ClientMeta, Data}, ?TIMEOUT).
|
||||||
|
@ -132,38 +137,32 @@ append(Pid, Data) ->
|
||||||
% @doc Append data to file, supplying client metadata and (if desired) a
|
% @doc Append data to file, supplying client metadata and (if desired) a
|
||||||
% reservation for additional space. ClientMeta is a proplist and expects the
|
% reservation for additional space. ClientMeta is a proplist and expects the
|
||||||
% same keys as write/4.
|
% same keys as write/4.
|
||||||
-spec append(Pid :: pid(), ClientMeta :: proplists:proplist(),
|
-spec append(Pid :: pid(), ClientMeta :: proplists:proplist(),
|
||||||
Extra :: non_neg_integer(), Data :: binary()) -> ok|{error, term()}.
|
Extra :: non_neg_integer(), Data :: binary()) -> ok|{error, term()}.
|
||||||
append(Pid, ClientMeta, Extra, Data) ->
|
append(Pid, ClientMeta, Extra, Data) ->
|
||||||
gen_server:call(Pid, {append, ClientMeta, Extra, Data}, ?TIMEOUT).
|
gen_server:call(Pid, {append, ClientMeta, Extra, Data}, ?TIMEOUT).
|
||||||
|
|
||||||
%% TODO
|
|
||||||
%% read_repair(Filename, Offset, Data) ???
|
|
||||||
%% makes sense to me, but we could make the write path serve both purposes
|
|
||||||
%% I suppose...
|
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
|
|
||||||
% @private
|
% @private
|
||||||
init({Filename, DataDir}) ->
|
init({Filename, DataDir}) ->
|
||||||
CsumFile = machi_util:make_csum_filename(DataDir, Filename),
|
CsumFile = machi_util:make_csum_filename(DataDir, Filename),
|
||||||
{_, DPath} = machi_util:make_data_filename(DataDir, Filename),
|
{_, DPath} = machi_util:make_data_filename(DataDir, Filename),
|
||||||
LastWriteOffset = case parse_csum_file(CsumFile) of
|
UnwrittenBytes = parse_csum_file(CsumFile),
|
||||||
0 -> ?MINIMUM_OFFSET;
|
{Eof, infinity} = lists:last(UnwrittenBytes),
|
||||||
V -> V
|
|
||||||
end,
|
|
||||||
{ok, FHd} = file:open(DPath, [read, write, binary, raw]),
|
{ok, FHd} = file:open(DPath, [read, write, binary, raw]),
|
||||||
{ok, FHc} = file:open(CsumFile, [append, binary, raw]),
|
{ok, FHc} = file:open(CsumFile, [append, binary, raw]),
|
||||||
Tref = schedule_tick(),
|
Tref = schedule_tick(),
|
||||||
{ok, #state{
|
{ok, #state{
|
||||||
filename = Filename,
|
filename = Filename,
|
||||||
data_dir = DataDir,
|
data_dir = DataDir,
|
||||||
data_path = DPath,
|
data_path = DPath,
|
||||||
csum_file = CsumFile,
|
csum_file = CsumFile,
|
||||||
data_filehandle = FHd,
|
data_filehandle = FHd,
|
||||||
csum_filehandle = FHc,
|
csum_filehandle = FHc,
|
||||||
tref = Tref,
|
tref = Tref,
|
||||||
last_write_offset = LastWriteOffset}}.
|
unwritten_bytes = UnwrittenBytes,
|
||||||
|
eof_position = Eof}}.
|
||||||
|
|
||||||
handle_call({sync, data}, _From, State = #state{ data_filehandle = FHd }) ->
|
handle_call({sync, data}, _From, State = #state{ data_filehandle = FHd }) ->
|
||||||
R = file:sync(FHd),
|
R = file:sync(FHd),
|
||||||
|
@ -195,29 +194,30 @@ handle_call({sync, all}, _From, State = #state{filename = F,
|
||||||
|
|
||||||
%%% READS
|
%%% READS
|
||||||
|
|
||||||
handle_call({read, _Offset, _Length}, _From,
|
handle_call({read, _Offset, _Length}, _From,
|
||||||
State = #state{wedged = true,
|
State = #state{wedged = true,
|
||||||
reads = {T, Err}
|
reads = {T, Err}
|
||||||
}) ->
|
}) ->
|
||||||
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};
|
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};
|
||||||
|
|
||||||
handle_call({read, Offset, Length}, _From,
|
handle_call({read, Offset, Length}, _From,
|
||||||
State = #state{last_write_offset = Last,
|
State = #state{eof_position = Eof,
|
||||||
reads = {T, Err}
|
reads = {T, Err}
|
||||||
}) when Offset + Length > Last ->
|
}) when Offset + Length > Eof ->
|
||||||
lager:error("Read request at offset ~p for ~p bytes is past the last write offset of ~p",
|
lager:error("Read request at offset ~p for ~p bytes is past the last write offset of ~p",
|
||||||
[Offset, Length, Last]),
|
[Offset, Length, Eof]),
|
||||||
{reply, {error, not_written}, State = #state{reads = {T + 1, Err + 1}}};
|
{reply, {error, not_written}, State = #state{reads = {T + 1, Err + 1}}};
|
||||||
|
|
||||||
handle_call({read, Offset, Length}, _From,
|
handle_call({read, Offset, Length}, _From,
|
||||||
State = #state{filename = F,
|
State = #state{filename = F,
|
||||||
data_filehandle = FH,
|
data_filehandle = FH,
|
||||||
|
unwritten_bytes = U,
|
||||||
reads = {T, Err}
|
reads = {T, Err}
|
||||||
}) ->
|
}) ->
|
||||||
|
|
||||||
Checksum = get({Offset, Length}), %% N.B. Maybe be 'undefined'!
|
Checksum = get({Offset, Length}), %% N.B. Maybe be 'undefined'!
|
||||||
|
|
||||||
{Resp, NewErr} = case do_read(FH, F, Checksum, Offset, Length) of
|
{Resp, NewErr} = case handle_read(FH, F, Checksum, Offset, Length, U) of
|
||||||
{ok, Bytes, Csum} ->
|
{ok, Bytes, Csum} ->
|
||||||
{{ok, Bytes, Csum}, Err};
|
{{ok, Bytes, Csum}, Err};
|
||||||
eof ->
|
eof ->
|
||||||
|
@ -229,74 +229,82 @@ handle_call({read, Offset, Length}, _From,
|
||||||
|
|
||||||
%%% WRITES
|
%%% WRITES
|
||||||
|
|
||||||
handle_call({write, _Offset, _ClientMeta, _Data}, _From,
|
handle_call({write, _Offset, _ClientMeta, _Data}, _From,
|
||||||
State = #state{wedged = true,
|
State = #state{wedged = true,
|
||||||
writes = {T, Err}
|
writes = {T, Err}
|
||||||
}) ->
|
}) ->
|
||||||
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};
|
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};
|
||||||
|
|
||||||
handle_call({write, Offset, ClientMeta, Data}, _From,
|
handle_call({write, Offset, ClientMeta, Data}, _From,
|
||||||
State = #state{last_write_offset = Last,
|
State = #state{unwritten_bytes = U,
|
||||||
filename = F,
|
filename = F,
|
||||||
writes = {T, Err},
|
writes = {T, Err},
|
||||||
data_filehandle = FHd,
|
data_filehandle = FHd,
|
||||||
csum_filehandle = FHc
|
csum_filehandle = FHc
|
||||||
}) ->
|
}) ->
|
||||||
|
|
||||||
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE),
|
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE),
|
||||||
ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>),
|
ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>),
|
||||||
|
|
||||||
{Resp, NewErr, NewLast} =
|
{Resp, NewErr, NewU} =
|
||||||
case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of
|
case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of
|
||||||
{error, {bad_csum, Bad}} ->
|
{error, {bad_csum, Bad}} ->
|
||||||
lager:error("Bad checksum on write; client sent ~p, we computed ~p",
|
lager:error("Bad checksum on write; client sent ~p, we computed ~p",
|
||||||
[ClientCsum, Bad]),
|
[ClientCsum, Bad]),
|
||||||
{{error, bad_csum}, Err + 1, Last};
|
{{error, bad_csum}, Err + 1, U};
|
||||||
TaggedCsum ->
|
TaggedCsum ->
|
||||||
case handle_write(FHd, FHc, F, TaggedCsum, Offset, Data) of
|
case handle_write(FHd, FHc, F, TaggedCsum, Offset, Data, U) of
|
||||||
ok ->
|
{ok, NewU1} ->
|
||||||
{ok, Err, Last + Offset};
|
{ok, Err, NewU1};
|
||||||
Error ->
|
Error ->
|
||||||
{Error, Err + 1, Last}
|
{Error, Err + 1, U}
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
{reply, Resp, State#state{writes = {T+1, NewErr}, last_write_offset = NewLast}};
|
{NewEof, infinity} = lists:last(NewU),
|
||||||
|
{reply, Resp, State#state{writes = {T+1, NewErr},
|
||||||
|
eof_position = NewEof,
|
||||||
|
unwritten_bytes = NewU
|
||||||
|
}};
|
||||||
|
|
||||||
%% APPENDS
|
%% APPENDS
|
||||||
|
|
||||||
handle_call({append, _ClientMeta, _Extra, _Data}, _From,
|
handle_call({append, _ClientMeta, _Extra, _Data}, _From,
|
||||||
State = #state{wedged = true,
|
State = #state{wedged = true,
|
||||||
appends = {T, Err}
|
appends = {T, Err}
|
||||||
}) ->
|
}) ->
|
||||||
{reply, {error, wedged}, State#state{appends = {T+1, Err+1}}};
|
{reply, {error, wedged}, State#state{appends = {T+1, Err+1}}};
|
||||||
|
|
||||||
handle_call({append, ClientMeta, Extra, Data}, _From,
|
handle_call({append, ClientMeta, Extra, Data}, _From,
|
||||||
State = #state{last_write_offset = Last,
|
State = #state{eof_position = EofP,
|
||||||
|
unwritten_bytes = U,
|
||||||
filename = F,
|
filename = F,
|
||||||
appends = {T, Err},
|
appends = {T, Err},
|
||||||
data_filehandle = FHd,
|
data_filehandle = FHd,
|
||||||
csum_filehandle = FHc
|
csum_filehandle = FHc
|
||||||
}) ->
|
}) ->
|
||||||
|
|
||||||
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE),
|
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE),
|
||||||
ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>),
|
ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>),
|
||||||
Size = iolist_size(Data),
|
|
||||||
|
|
||||||
{Resp, NewErr, NewLast} =
|
{Resp, NewErr, NewU} =
|
||||||
case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of
|
case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of
|
||||||
{error, {bad_csum, Bad}} ->
|
{error, {bad_csum, Bad}} ->
|
||||||
lager:error("Bad checksum; client sent ~p, we computed ~p",
|
lager:error("Bad checksum; client sent ~p, we computed ~p",
|
||||||
[ClientCsum, Bad]),
|
[ClientCsum, Bad]),
|
||||||
{{error, bad_csum}, Err + 1, Last};
|
{{error, bad_csum}, Err + 1, U};
|
||||||
TaggedCsum ->
|
TaggedCsum ->
|
||||||
case handle_write(FHd, FHc, F, TaggedCsum, Last, Data) of
|
case handle_write(FHd, FHc, F, TaggedCsum, EofP, Data, U) of
|
||||||
ok ->
|
{ok, NewU1} ->
|
||||||
{{ok, F, Last}, Err, Last + Size + Extra};
|
{{ok, F, EofP}, Err, NewU1};
|
||||||
Error ->
|
Error ->
|
||||||
{Error, Err + 1, Last}
|
{Error, Err + 1, EofP, U}
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
{reply, Resp, State#state{appends = {T+1, NewErr}, last_write_offset = NewLast}};
|
{NewEof, infinity} = lists:last(NewU),
|
||||||
|
{reply, Resp, State#state{appends = {T+1, NewErr},
|
||||||
|
eof_position = NewEof + Extra,
|
||||||
|
unwritten_bytes = NewU
|
||||||
|
}};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
lager:warning("Unknown call: ~p", [Req]),
|
lager:warning("Unknown call: ~p", [Req]),
|
||||||
|
@ -313,7 +321,7 @@ handle_info(tick, State = #state{
|
||||||
reads = {RT, RE},
|
reads = {RT, RE},
|
||||||
writes = {WT, WE},
|
writes = {WT, WE},
|
||||||
appends = {AT, AE}
|
appends = {AT, AE}
|
||||||
}) when Ops > 100 andalso
|
}) when Ops > 100 andalso
|
||||||
trunc(((RE+WE+AE) / RT+WT+AT) * 100) > ?TOO_MANY_ERRORS_RATIO ->
|
trunc(((RE+WE+AE) / RT+WT+AT) * 100) > ?TOO_MANY_ERRORS_RATIO ->
|
||||||
Errors = RE + WE + AE,
|
Errors = RE + WE + AE,
|
||||||
lager:notice("Got ~p errors. Shutting down.", [Errors]),
|
lager:notice("Got ~p errors. Shutting down.", [Errors]),
|
||||||
|
@ -374,7 +382,7 @@ handle_info(Req, State) ->
|
||||||
|
|
||||||
terminate(Reason, #state{
|
terminate(Reason, #state{
|
||||||
filename = F,
|
filename = F,
|
||||||
data_filehandle = FHd,
|
data_filehandle = FHd,
|
||||||
csum_filehandle = FHc,
|
csum_filehandle = FHc,
|
||||||
reads = {RT, RE},
|
reads = {RT, RE},
|
||||||
writes = {WT, WE},
|
writes = {WT, WE},
|
||||||
|
@ -398,7 +406,7 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
schedule_tick() ->
|
schedule_tick() ->
|
||||||
erlang:send_after(?TICK, self(), tick).
|
erlang:send_after(?TICK, self(), tick).
|
||||||
|
|
||||||
-spec check_or_make_tagged_csum(Type :: binary(),
|
-spec check_or_make_tagged_csum(Type :: binary(),
|
||||||
Checksum :: binary(),
|
Checksum :: binary(),
|
||||||
Data :: binary() ) -> binary() |
|
Data :: binary() ) -> binary() |
|
||||||
{error, {bad_csum, Bad :: binary()}}.
|
{error, {bad_csum, Bad :: binary()}}.
|
||||||
|
@ -406,7 +414,7 @@ check_or_make_tagged_csum(?CSUM_TAG_NONE, _Csum, Data) ->
|
||||||
%% We are making a checksum here
|
%% We are making a checksum here
|
||||||
Csum = machi_util:checksum_chunk(Data),
|
Csum = machi_util:checksum_chunk(Data),
|
||||||
machi_util:make_tagged_csum(server_sha, Csum);
|
machi_util:make_tagged_csum(server_sha, Csum);
|
||||||
check_or_make_tagged_csum(Tag, InCsum, Data) when Tag == ?CSUM_TAG_CLIENT_SHA;
|
check_or_make_tagged_csum(Tag, InCsum, Data) when Tag == ?CSUM_TAG_CLIENT_SHA;
|
||||||
Tag == ?CSUM_TAG_SERVER_SHA ->
|
Tag == ?CSUM_TAG_SERVER_SHA ->
|
||||||
Csum = machi_util:checksum_chunk(Data),
|
Csum = machi_util:checksum_chunk(Data),
|
||||||
case Csum =:= InCsum of
|
case Csum =:= InCsum of
|
||||||
|
@ -430,34 +438,62 @@ map_offsets_to_csums(CsumList) ->
|
||||||
insert_offsets({Offset, Length, Checksum}) ->
|
insert_offsets({Offset, Length, Checksum}) ->
|
||||||
put({Offset, Length}, Checksum).
|
put({Offset, Length}, Checksum).
|
||||||
|
|
||||||
|
-spec parse_csum_file( Filename :: string() ) -> [byte_sequence()].
|
||||||
parse_csum_file(Filename) ->
|
parse_csum_file(Filename) ->
|
||||||
|
%% using file:read_file works as long as the files are "small"
|
||||||
{ok, CsumData} = file:read_file(Filename),
|
{ok, CsumData} = file:read_file(Filename),
|
||||||
{DecodedCsums, _Junk} = machi_flu1:split_checksum_list_blob_decode(CsumData),
|
{DecodedCsums, _Junk} = machi_flu1:split_checksum_list_blob_decode(CsumData),
|
||||||
case DecodedCsums of
|
Sort = lists:sort(DecodedCsums),
|
||||||
[] -> 0;
|
case Sort of
|
||||||
|
[] -> [{?MINIMUM_OFFSET, infinity}];
|
||||||
_ ->
|
_ ->
|
||||||
map_offsets_to_csums(DecodedCsums),
|
map_offsets_to_csums(DecodedCsums),
|
||||||
{Offset, Size, _Csum} = lists:last(DecodedCsums),
|
{First, _, _} = hd(Sort),
|
||||||
Offset + Size
|
build_unwritten_bytes_list(Sort, First, [])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec do_read(FHd :: file:filehandle(),
|
-spec handle_read(FHd :: file:filehandle(),
|
||||||
Filename :: string(),
|
Filename :: string(),
|
||||||
TaggedCsum :: undefined|binary(),
|
TaggedCsum :: undefined|binary(),
|
||||||
Offset :: non_neg_integer(),
|
Offset :: non_neg_integer(),
|
||||||
Size :: non_neg_integer()) -> eof |
|
Size :: non_neg_integer(),
|
||||||
{ok, Bytes :: binary(), Csum :: binary()} |
|
Unwritten :: [byte_sequence()]
|
||||||
{error, bad_csum} |
|
) -> {ok, Bytes :: binary(), Csum :: binary()} |
|
||||||
{error, partial_read} |
|
eof |
|
||||||
{error, Other :: term() }.
|
{error, bad_csum} |
|
||||||
do_read(FHd, Filename, undefined, Offset, Size) ->
|
{error, partial_read} |
|
||||||
do_read(FHd, Filename, machi_util:make_tagged_csum(none), Offset, Size);
|
{error, not_written} |
|
||||||
|
{error, Other :: term() }.
|
||||||
|
% @private Attempt a read operation on the given offset and length.
|
||||||
|
% <li>
|
||||||
|
% <ul> If the byte range is not yet written, `{error, not_written}' is
|
||||||
|
% returned.</ul>
|
||||||
|
% <ul> If the checksum given does not match what comes off the disk,
|
||||||
|
% `{error, bad_csum}' is returned.</ul>
|
||||||
|
% <ul> If the number of bytes that comes off the disk is not the requested length,
|
||||||
|
% `{error, partial_read}' is returned.</ul>
|
||||||
|
% <ul> If the offset is at or beyond the current file boundary, `eof' is returned.</ul>
|
||||||
|
% <ul> If some kind of POSIX error occurs, the OTP version of that POSIX error
|
||||||
|
% tuple is returned.</ul>
|
||||||
|
% </li>
|
||||||
|
%
|
||||||
|
% On success, `{ok, Bytes, Checksum}' is returned.
|
||||||
|
handle_read(FHd, Filename, undefined, Offset, Size, U) ->
|
||||||
|
handle_read(FHd, Filename, machi_util:make_tagged_csum(none), Offset, Size, U);
|
||||||
|
|
||||||
|
handle_read(FHd, Filename, TaggedCsum, Offset, Size, U) ->
|
||||||
|
case is_byte_range_unwritten(Offset, Size, U) of
|
||||||
|
true ->
|
||||||
|
{error, not_written};
|
||||||
|
false ->
|
||||||
|
do_read(FHd, Filename, TaggedCsum, Offset, Size)
|
||||||
|
end.
|
||||||
|
|
||||||
|
% @private Implements the disk read
|
||||||
do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
|
do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
|
||||||
case file:pread(FHd, Offset, Size) of
|
case file:pread(FHd, Offset, Size) of
|
||||||
eof ->
|
eof ->
|
||||||
eof;
|
eof;
|
||||||
|
|
||||||
{ok, Bytes} when byte_size(Bytes) == Size ->
|
{ok, Bytes} when byte_size(Bytes) == Size ->
|
||||||
{Type, Ck} = machi_util:unmake_tagged_csum(TaggedCsum),
|
{Type, Ck} = machi_util:unmake_tagged_csum(TaggedCsum),
|
||||||
case check_or_make_tagged_csum(Type, Ck, Bytes) of
|
case check_or_make_tagged_csum(Type, Ck, Bytes) of
|
||||||
|
@ -468,12 +504,10 @@ do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
|
||||||
TaggedCsum ->
|
TaggedCsum ->
|
||||||
{ok, Bytes, TaggedCsum}
|
{ok, Bytes, TaggedCsum}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
{ok, Partial} ->
|
{ok, Partial} ->
|
||||||
lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p",
|
lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p",
|
||||||
[Filename, Offset, Size, byte_size(Partial)]),
|
[Filename, Offset, Size, byte_size(Partial)]),
|
||||||
{error, partial_read};
|
{error, partial_read};
|
||||||
|
|
||||||
Other ->
|
Other ->
|
||||||
lager:error("While reading file ~p, offset ~p, length ~p, got ~p",
|
lager:error("While reading file ~p, offset ~p, length ~p, got ~p",
|
||||||
[Filename, Offset, Size, Other]),
|
[Filename, Offset, Size, Other]),
|
||||||
|
@ -485,29 +519,41 @@ do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
|
||||||
Filename :: string(),
|
Filename :: string(),
|
||||||
TaggedCsum :: binary(),
|
TaggedCsum :: binary(),
|
||||||
Offset :: non_neg_integer(),
|
Offset :: non_neg_integer(),
|
||||||
Data :: binary() ) -> ok |
|
Data :: binary(),
|
||||||
{error, written} |
|
Unwritten :: [byte_sequence()]
|
||||||
{error, Reason :: term()}.
|
) -> {ok, NewU :: [byte_sequence()]} |
|
||||||
handle_write(FHd, FHc, Filename, TaggedCsum, Offset, Data) ->
|
{error, written} |
|
||||||
|
{error, Reason :: term()}.
|
||||||
|
handle_write(FHd, FHc, Filename, TaggedCsum, Offset, Data, U) ->
|
||||||
Size = iolist_size(Data),
|
Size = iolist_size(Data),
|
||||||
case do_read(FHd, Filename, TaggedCsum, Offset, Size) of
|
|
||||||
eof ->
|
case is_byte_range_unwritten(Offset, Size, U) of
|
||||||
|
false ->
|
||||||
|
case get({Offset, Size}) of
|
||||||
|
undefined ->
|
||||||
|
{error, written};
|
||||||
|
TaggedCsum ->
|
||||||
|
case do_read(FHd, Filename, TaggedCsum, Offset, Size) of
|
||||||
|
eof ->
|
||||||
|
lager:warning("This should never happen: got eof while reading at offset ~p in file ~p that's supposedly written",
|
||||||
|
[Offset, Filename]),
|
||||||
|
{error, server_insanity};
|
||||||
|
{ok, _, _} ->
|
||||||
|
{ok, U};
|
||||||
|
_ ->
|
||||||
|
{error, written}
|
||||||
|
end
|
||||||
|
end;
|
||||||
|
true ->
|
||||||
try
|
try
|
||||||
do_write(FHd, FHc, Filename, TaggedCsum, Offset, Size, Data)
|
do_write(FHd, FHc, Filename, TaggedCsum, Offset, Size, Data, U)
|
||||||
catch
|
catch
|
||||||
%%% XXX FIXME: be more specific on badmatch that might
|
%%% XXX FIXME: be more specific on badmatch that might
|
||||||
%%% occur around line 520 when we write the checksum
|
%%% occur around line 533 when we write the checksum
|
||||||
%%% file entry for the data blob we just put on the disk
|
%%% file entry for the data blob we just put on the disk
|
||||||
error:Reason ->
|
error:Reason ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end;
|
end
|
||||||
{ok, _, _} ->
|
|
||||||
% yep, we did that write! Honest.
|
|
||||||
ok;
|
|
||||||
{error, Error} ->
|
|
||||||
lager:error("During write to ~p, offset ~p, got error ~p; returning {error, written}",
|
|
||||||
[Filename, Offset, Error]),
|
|
||||||
{error, written}
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec do_write( FHd :: file:descriptor(),
|
-spec do_write( FHd :: file:descriptor(),
|
||||||
|
@ -516,18 +562,118 @@ handle_write(FHd, FHc, Filename, TaggedCsum, Offset, Data) ->
|
||||||
TaggedCsum :: binary(),
|
TaggedCsum :: binary(),
|
||||||
Offset :: non_neg_integer(),
|
Offset :: non_neg_integer(),
|
||||||
Size :: non_neg_integer(),
|
Size :: non_neg_integer(),
|
||||||
Data :: binary() ) -> ok|term().
|
Data :: binary(),
|
||||||
do_write(FHd, FHc, Filename, TaggedCsum, Offset, Size, Data) ->
|
Unwritten :: [byte_sequence()]
|
||||||
|
) -> {ok, NewUnwritten :: [byte_sequence()]} |
|
||||||
|
{error, Reason :: term()}.
|
||||||
|
do_write(FHd, FHc, Filename, TaggedCsum, Offset, Size, Data, U) ->
|
||||||
case file:pwrite(FHd, Offset, Data) of
|
case file:pwrite(FHd, Offset, Data) of
|
||||||
ok ->
|
ok ->
|
||||||
lager:debug("Successful write in file ~p at offset ~p, length ~p",
|
lager:debug("Successful write in file ~p at offset ~p, length ~p",
|
||||||
[Filename, Offset, Size]),
|
[Filename, Offset, Size]),
|
||||||
EncodedCsum = encode_csum_file_entry(Offset, Size, TaggedCsum),
|
EncodedCsum = encode_csum_file_entry(Offset, Size, TaggedCsum),
|
||||||
ok = file:write(FHc, EncodedCsum),
|
ok = file:write(FHc, EncodedCsum),
|
||||||
lager:debug("Successful write to checksum file for ~p.", [Filename]),
|
put({Offset, Size}, TaggedCsum),
|
||||||
ok;
|
NewU = update_unwritten(Offset, Size, U),
|
||||||
|
lager:debug("Successful write to checksum file for ~p; unwritten bytes are now: ~p",
|
||||||
|
[Filename, NewU]),
|
||||||
|
{ok, NewU};
|
||||||
Other ->
|
Other ->
|
||||||
lager:error("Got ~p during write to file ~p at offset ~p, length ~p",
|
lager:error("Got ~p during write to file ~p at offset ~p, length ~p",
|
||||||
[Other, Filename, Offset, Size]),
|
[Other, Filename, Offset, Size]),
|
||||||
{error, Other}
|
{error, Other}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec is_byte_range_unwritten( Offset :: non_neg_integer(),
|
||||||
|
Size :: pos_integer(),
|
||||||
|
Unwritten :: [byte_sequence()] ) -> boolean().
|
||||||
|
% @private Given an offset and a size, return `true' if a byte range has
|
||||||
|
% <b>not</b> been written. Otherwise, return `false'.
|
||||||
|
is_byte_range_unwritten(Offset, Size, Unwritten) ->
|
||||||
|
case length(Unwritten) of
|
||||||
|
0 ->
|
||||||
|
lager:critical("Unwritten byte list has 0 entries! This should never happen."),
|
||||||
|
false;
|
||||||
|
1 ->
|
||||||
|
{Eof, infinity} = hd(Unwritten),
|
||||||
|
Offset >= Eof;
|
||||||
|
_ ->
|
||||||
|
case lookup_unwritten(Offset, Size, Unwritten) of
|
||||||
|
{ok, _} -> true;
|
||||||
|
not_found -> false
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec lookup_unwritten( Offset :: non_neg_integer(),
|
||||||
|
Size :: pos_integer(),
|
||||||
|
Unwritten :: [byte_sequence()]
|
||||||
|
) -> {ok, byte_sequence()} | not_found.
|
||||||
|
% @private Given an offset and a size, scan the list of unwritten bytes and
|
||||||
|
% look for a "hole" where a write might be allowed if any exist. If a
|
||||||
|
% suitable byte sequence is found, the function returns a tuple of {ok,
|
||||||
|
% {Position, Space}} is returned. `not_found' is returned if no suitable
|
||||||
|
% space is located.
|
||||||
|
lookup_unwritten(_Offset, _Size, []) ->
|
||||||
|
not_found;
|
||||||
|
lookup_unwritten(Offset, _Size, [H={Pos, infinity}|_Rest]) when Offset >= Pos ->
|
||||||
|
{ok, H};
|
||||||
|
lookup_unwritten(Offset, Size, [H={Pos, Space}|_Rest])
|
||||||
|
when Offset >= Pos andalso Offset < Pos+Space
|
||||||
|
andalso Size =< (Space - (Offset - Pos)) ->
|
||||||
|
{ok, H};
|
||||||
|
lookup_unwritten(Offset, Size, [_H|Rest]) ->
|
||||||
|
lookup_unwritten(Offset, Size, Rest).
|
||||||
|
|
||||||
|
-spec update_unwritten( Offset :: non_neg_integer(),
|
||||||
|
Size :: pos_integer(),
|
||||||
|
Unwritten :: [byte_sequence()] ) -> NewUnwritten :: [byte_sequence()].
|
||||||
|
% @private Given an offset, a size and the unwritten byte list, return an updated
|
||||||
|
% and sorted unwritten byte list accounting for any completed write operation.
|
||||||
|
update_unwritten(Offset, Size, Unwritten) ->
|
||||||
|
case lookup_unwritten(Offset, Size, Unwritten) of
|
||||||
|
not_found ->
|
||||||
|
lager:error("Couldn't find byte sequence tuple for a write which earlier found a valid spot to write!!! This should never happen!"),
|
||||||
|
Unwritten;
|
||||||
|
{ok, {Offset, Size}} ->
|
||||||
|
%% we neatly filled in our hole...
|
||||||
|
lists:keydelete(Offset, 1, Unwritten);
|
||||||
|
{ok, S={Pos, _}} ->
|
||||||
|
lists:sort(lists:keydelete(Pos, 1, Unwritten) ++
|
||||||
|
update_byte_range(Offset, Size, S))
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec update_byte_range( Offset :: non_neg_integer(),
|
||||||
|
Size :: pos_integer(),
|
||||||
|
Sequence :: byte_sequence() ) -> Updates :: [byte_sequence()].
|
||||||
|
% @private Given an offset and size and a byte sequence tuple where a
|
||||||
|
% write took place, return a list of updates to the list of unwritten bytes
|
||||||
|
% accounting for the space occupied by the just completed write.
|
||||||
|
update_byte_range(Offset, Size, {Eof, infinity}) when Offset == Eof ->
|
||||||
|
[{Offset + Size, infinity}];
|
||||||
|
update_byte_range(Offset, Size, {Eof, infinity}) when Offset > Eof ->
|
||||||
|
[{Eof, (Offset - Eof)}, {Offset+Size, infinity}];
|
||||||
|
update_byte_range(Offset, Size, {Pos, Space}) when Offset == Pos andalso Size < Space ->
|
||||||
|
[{Offset + Size, Space - Size}];
|
||||||
|
update_byte_range(Offset, Size, {Pos, Space}) when Offset > Pos ->
|
||||||
|
[{Pos, Offset - Pos}, {Offset+Size, ( (Pos+Space) - (Offset + Size) )}].
|
||||||
|
|
||||||
|
|
||||||
|
-spec build_unwritten_bytes_list( CsumData :: [{ Offset :: non_neg_integer(),
|
||||||
|
Size :: pos_integer(),
|
||||||
|
Checksum :: binary() }],
|
||||||
|
LastOffset :: non_neg_integer(),
|
||||||
|
Acc :: list() ) -> [byte_sequence()].
|
||||||
|
% @private Given a <b>sorted</b> list of checksum data tuples, return a sorted
|
||||||
|
% list of unwritten byte ranges. The output list <b>always</b> has at least one
|
||||||
|
% entry: the last tuple in the list is guaranteed to be the current end of
|
||||||
|
% bytes written to a particular file with the special space moniker
|
||||||
|
% `infinity'.
|
||||||
|
build_unwritten_bytes_list([], Last, Acc) ->
|
||||||
|
NewAcc = [ {Last, infinity} | Acc ],
|
||||||
|
lists:reverse(NewAcc);
|
||||||
|
build_unwritten_bytes_list([{CurrentOffset, CurrentSize, _Csum}|Rest], LastOffset, Acc) when
|
||||||
|
CurrentOffset /= LastOffset ->
|
||||||
|
Hole = CurrentOffset - LastOffset,
|
||||||
|
build_unwritten_bytes_list(Rest, (CurrentOffset+CurrentSize), [{LastOffset, Hole}|Acc]);
|
||||||
|
build_unwritten_bytes_list([{CO, CS, _Ck}|Rest], _LastOffset, Acc) ->
|
||||||
|
build_unwritten_bytes_list(Rest, CO + CS, Acc).
|
||||||
|
|
Loading…
Reference in a new issue