diff --git a/src/machi_file_proxy.erl b/src/machi_file_proxy.erl index e804589..0fe9732 100644 --- a/src/machi_file_proxy.erl +++ b/src/machi_file_proxy.erl @@ -68,23 +68,26 @@ -define(TOO_MANY_ERRORS_RATIO, 50). -type op_stats() :: { Total :: non_neg_integer(), Errors :: non_neg_integer() }. - +-type byte_sequence() :: { Offset :: non_neg_integer(), + Size :: pos_integer()|infinity }. + -record(state, { - data_dir :: string() | undefined, - filename :: string() | undefined, - data_path :: string() | undefined, - wedged = false :: boolean(), - csum_file :: string()|undefined, - csum_path :: string()|undefined, - last_write_offset = 0 :: non_neg_integer(), - data_filehandle :: file:filehandle(), - csum_filehandle :: file:filehandle(), - tref :: reference(), %% timer ref - ticks = 0 :: non_neg_integer(), %% ticks elapsed with no new operations - ops = 0 :: non_neg_integer(), %% sum of all ops - reads = {0, 0} :: op_stats(), - writes = {0, 0} :: op_stats(), - appends = {0, 0} :: op_stats() + data_dir :: string() | undefined, + filename :: string() | undefined, + data_path :: string() | undefined, + wedged = false :: boolean(), + csum_file :: string()|undefined, + csum_path :: string()|undefined, + eof_position = 0 :: non_neg_integer(), + unwritten_bytes = [] :: [byte_sequence()], + data_filehandle :: file:filehandle(), + csum_filehandle :: file:filehandle(), + tref :: reference(), %% timer ref + ticks = 0 :: non_neg_integer(), %% ticks elapsed with no new operations + ops = 0 :: non_neg_integer(), %% sum of all ops + reads = {0, 0} :: op_stats(), + writes = {0, 0} :: op_stats(), + appends = {0, 0} :: op_stats() }). %% Public API @@ -103,23 +106,25 @@ sync(Pid, Type) -> gen_server:call(Pid, {sync, Type}, ?TIMEOUT). % @doc Read file at offset for length --spec read(Pid :: pid(), Offset :: non_neg_integer(), - Length :: non_neg_integer()) -> {ok, Data :: binary(), Checksum :: binary()}|{error, term()}. +-spec read(Pid :: pid(), + Offset :: non_neg_integer(), + Length :: non_neg_integer()) -> {ok, Data :: binary(), Checksum :: binary()} | + {error, Reason :: term()}. read(Pid, Offset, Length) -> gen_server:call(Pid, {read, Offset, Length}, ?TIMEOUT). % @doc Write data at offset --spec write(Pid :: pid(), Offset :: non_neg_integer(), Data :: binary()) -> ok. +-spec write(Pid :: pid(), Offset :: non_neg_integer(), Data :: binary()) -> ok|{error, term()}. write(Pid, Offset, Data) -> write(Pid, Offset, [], Data). % @doc Write data at offset, including the client metadata. ClientMeta is a proplist -% that expects the following keys and values: +% that expects the following keys and values: % --spec write(Pid :: pid(), Offset :: non_neg_integer(), ClientMeta :: proplists:proplist(), +-spec write(Pid :: pid(), Offset :: non_neg_integer(), ClientMeta :: proplists:proplist(), Data :: binary()) -> ok|{error, term()}. write(Pid, Offset, ClientMeta, Data) -> gen_server:call(Pid, {write, Offset, ClientMeta, Data}, ?TIMEOUT). @@ -132,38 +137,32 @@ append(Pid, Data) -> % @doc Append data to file, supplying client metadata and (if desired) a % reservation for additional space. ClientMeta is a proplist and expects the % same keys as write/4. --spec append(Pid :: pid(), ClientMeta :: proplists:proplist(), +-spec append(Pid :: pid(), ClientMeta :: proplists:proplist(), Extra :: non_neg_integer(), Data :: binary()) -> ok|{error, term()}. append(Pid, ClientMeta, Extra, Data) -> gen_server:call(Pid, {append, ClientMeta, Extra, Data}, ?TIMEOUT). -%% TODO -%% read_repair(Filename, Offset, Data) ??? -%% makes sense to me, but we could make the write path serve both purposes -%% I suppose... - %% gen_server callbacks % @private init({Filename, DataDir}) -> CsumFile = machi_util:make_csum_filename(DataDir, Filename), {_, DPath} = machi_util:make_data_filename(DataDir, Filename), - LastWriteOffset = case parse_csum_file(CsumFile) of - 0 -> ?MINIMUM_OFFSET; - V -> V - end, + UnwrittenBytes = parse_csum_file(CsumFile), + {Eof, infinity} = lists:last(UnwrittenBytes), {ok, FHd} = file:open(DPath, [read, write, binary, raw]), {ok, FHc} = file:open(CsumFile, [append, binary, raw]), Tref = schedule_tick(), {ok, #state{ - filename = Filename, - data_dir = DataDir, - data_path = DPath, - csum_file = CsumFile, + filename = Filename, + data_dir = DataDir, + data_path = DPath, + csum_file = CsumFile, data_filehandle = FHd, csum_filehandle = FHc, - tref = Tref, - last_write_offset = LastWriteOffset}}. + tref = Tref, + unwritten_bytes = UnwrittenBytes, + eof_position = Eof}}. handle_call({sync, data}, _From, State = #state{ data_filehandle = FHd }) -> R = file:sync(FHd), @@ -195,29 +194,30 @@ handle_call({sync, all}, _From, State = #state{filename = F, %%% READS -handle_call({read, _Offset, _Length}, _From, +handle_call({read, _Offset, _Length}, _From, State = #state{wedged = true, reads = {T, Err} }) -> {reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}}; -handle_call({read, Offset, Length}, _From, - State = #state{last_write_offset = Last, +handle_call({read, Offset, Length}, _From, + State = #state{eof_position = Eof, reads = {T, Err} - }) when Offset + Length > Last -> - lager:error("Read request at offset ~p for ~p bytes is past the last write offset of ~p", - [Offset, Length, Last]), + }) when Offset + Length > Eof -> + lager:error("Read request at offset ~p for ~p bytes is past the last write offset of ~p", + [Offset, Length, Eof]), {reply, {error, not_written}, State = #state{reads = {T + 1, Err + 1}}}; -handle_call({read, Offset, Length}, _From, +handle_call({read, Offset, Length}, _From, State = #state{filename = F, data_filehandle = FH, + unwritten_bytes = U, reads = {T, Err} }) -> Checksum = get({Offset, Length}), %% N.B. Maybe be 'undefined'! - {Resp, NewErr} = case do_read(FH, F, Checksum, Offset, Length) of + {Resp, NewErr} = case handle_read(FH, F, Checksum, Offset, Length, U) of {ok, Bytes, Csum} -> {{ok, Bytes, Csum}, Err}; eof -> @@ -229,74 +229,82 @@ handle_call({read, Offset, Length}, _From, %%% WRITES -handle_call({write, _Offset, _ClientMeta, _Data}, _From, +handle_call({write, _Offset, _ClientMeta, _Data}, _From, State = #state{wedged = true, writes = {T, Err} }) -> {reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}}; -handle_call({write, Offset, ClientMeta, Data}, _From, - State = #state{last_write_offset = Last, +handle_call({write, Offset, ClientMeta, Data}, _From, + State = #state{unwritten_bytes = U, filename = F, writes = {T, Err}, data_filehandle = FHd, csum_filehandle = FHc }) -> - ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE), - ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>), + ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE), + ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>), - {Resp, NewErr, NewLast} = + {Resp, NewErr, NewU} = case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of {error, {bad_csum, Bad}} -> - lager:error("Bad checksum on write; client sent ~p, we computed ~p", + lager:error("Bad checksum on write; client sent ~p, we computed ~p", [ClientCsum, Bad]), - {{error, bad_csum}, Err + 1, Last}; + {{error, bad_csum}, Err + 1, U}; TaggedCsum -> - case handle_write(FHd, FHc, F, TaggedCsum, Offset, Data) of - ok -> - {ok, Err, Last + Offset}; + case handle_write(FHd, FHc, F, TaggedCsum, Offset, Data, U) of + {ok, NewU1} -> + {ok, Err, NewU1}; Error -> - {Error, Err + 1, Last} + {Error, Err + 1, U} end end, - {reply, Resp, State#state{writes = {T+1, NewErr}, last_write_offset = NewLast}}; + {NewEof, infinity} = lists:last(NewU), + {reply, Resp, State#state{writes = {T+1, NewErr}, + eof_position = NewEof, + unwritten_bytes = NewU + }}; %% APPENDS -handle_call({append, _ClientMeta, _Extra, _Data}, _From, +handle_call({append, _ClientMeta, _Extra, _Data}, _From, State = #state{wedged = true, appends = {T, Err} }) -> {reply, {error, wedged}, State#state{appends = {T+1, Err+1}}}; -handle_call({append, ClientMeta, Extra, Data}, _From, - State = #state{last_write_offset = Last, +handle_call({append, ClientMeta, Extra, Data}, _From, + State = #state{eof_position = EofP, + unwritten_bytes = U, filename = F, appends = {T, Err}, data_filehandle = FHd, csum_filehandle = FHc }) -> - ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE), - ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>), - Size = iolist_size(Data), + ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE), + ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>), - {Resp, NewErr, NewLast} = + {Resp, NewErr, NewU} = case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of {error, {bad_csum, Bad}} -> lager:error("Bad checksum; client sent ~p, we computed ~p", [ClientCsum, Bad]), - {{error, bad_csum}, Err + 1, Last}; + {{error, bad_csum}, Err + 1, U}; TaggedCsum -> - case handle_write(FHd, FHc, F, TaggedCsum, Last, Data) of - ok -> - {{ok, F, Last}, Err, Last + Size + Extra}; + case handle_write(FHd, FHc, F, TaggedCsum, EofP, Data, U) of + {ok, NewU1} -> + {{ok, F, EofP}, Err, NewU1}; Error -> - {Error, Err + 1, Last} + {Error, Err + 1, EofP, U} end end, - {reply, Resp, State#state{appends = {T+1, NewErr}, last_write_offset = NewLast}}; + {NewEof, infinity} = lists:last(NewU), + {reply, Resp, State#state{appends = {T+1, NewErr}, + eof_position = NewEof + Extra, + unwritten_bytes = NewU + }}; handle_call(Req, _From, State) -> lager:warning("Unknown call: ~p", [Req]), @@ -313,7 +321,7 @@ handle_info(tick, State = #state{ reads = {RT, RE}, writes = {WT, WE}, appends = {AT, AE} - }) when Ops > 100 andalso + }) when Ops > 100 andalso trunc(((RE+WE+AE) / RT+WT+AT) * 100) > ?TOO_MANY_ERRORS_RATIO -> Errors = RE + WE + AE, lager:notice("Got ~p errors. Shutting down.", [Errors]), @@ -374,7 +382,7 @@ handle_info(Req, State) -> terminate(Reason, #state{ filename = F, - data_filehandle = FHd, + data_filehandle = FHd, csum_filehandle = FHc, reads = {RT, RE}, writes = {WT, WE}, @@ -398,7 +406,7 @@ code_change(_OldVsn, State, _Extra) -> schedule_tick() -> erlang:send_after(?TICK, self(), tick). --spec check_or_make_tagged_csum(Type :: binary(), +-spec check_or_make_tagged_csum(Type :: binary(), Checksum :: binary(), Data :: binary() ) -> binary() | {error, {bad_csum, Bad :: binary()}}. @@ -406,7 +414,7 @@ check_or_make_tagged_csum(?CSUM_TAG_NONE, _Csum, Data) -> %% We are making a checksum here Csum = machi_util:checksum_chunk(Data), machi_util:make_tagged_csum(server_sha, Csum); -check_or_make_tagged_csum(Tag, InCsum, Data) when Tag == ?CSUM_TAG_CLIENT_SHA; +check_or_make_tagged_csum(Tag, InCsum, Data) when Tag == ?CSUM_TAG_CLIENT_SHA; Tag == ?CSUM_TAG_SERVER_SHA -> Csum = machi_util:checksum_chunk(Data), case Csum =:= InCsum of @@ -430,34 +438,62 @@ map_offsets_to_csums(CsumList) -> insert_offsets({Offset, Length, Checksum}) -> put({Offset, Length}, Checksum). +-spec parse_csum_file( Filename :: string() ) -> [byte_sequence()]. parse_csum_file(Filename) -> + %% using file:read_file works as long as the files are "small" {ok, CsumData} = file:read_file(Filename), {DecodedCsums, _Junk} = machi_flu1:split_checksum_list_blob_decode(CsumData), - case DecodedCsums of - [] -> 0; + Sort = lists:sort(DecodedCsums), + case Sort of + [] -> [{?MINIMUM_OFFSET, infinity}]; _ -> map_offsets_to_csums(DecodedCsums), - {Offset, Size, _Csum} = lists:last(DecodedCsums), - Offset + Size + {First, _, _} = hd(Sort), + build_unwritten_bytes_list(Sort, First, []) end. --spec do_read(FHd :: file:filehandle(), - Filename :: string(), - TaggedCsum :: undefined|binary(), - Offset :: non_neg_integer(), - Size :: non_neg_integer()) -> eof | - {ok, Bytes :: binary(), Csum :: binary()} | - {error, bad_csum} | - {error, partial_read} | - {error, Other :: term() }. -do_read(FHd, Filename, undefined, Offset, Size) -> - do_read(FHd, Filename, machi_util:make_tagged_csum(none), Offset, Size); +-spec handle_read(FHd :: file:filehandle(), + Filename :: string(), + TaggedCsum :: undefined|binary(), + Offset :: non_neg_integer(), + Size :: non_neg_integer(), + Unwritten :: [byte_sequence()] + ) -> {ok, Bytes :: binary(), Csum :: binary()} | + eof | + {error, bad_csum} | + {error, partial_read} | + {error, not_written} | + {error, Other :: term() }. +% @private Attempt a read operation on the given offset and length. +%
  • +% +% +% +% +% +%
  • +% +% On success, `{ok, Bytes, Checksum}' is returned. +handle_read(FHd, Filename, undefined, Offset, Size, U) -> + handle_read(FHd, Filename, machi_util:make_tagged_csum(none), Offset, Size, U); +handle_read(FHd, Filename, TaggedCsum, Offset, Size, U) -> + case is_byte_range_unwritten(Offset, Size, U) of + true -> + {error, not_written}; + false -> + do_read(FHd, Filename, TaggedCsum, Offset, Size) + end. + +% @private Implements the disk read do_read(FHd, Filename, TaggedCsum, Offset, Size) -> case file:pread(FHd, Offset, Size) of eof -> eof; - {ok, Bytes} when byte_size(Bytes) == Size -> {Type, Ck} = machi_util:unmake_tagged_csum(TaggedCsum), case check_or_make_tagged_csum(Type, Ck, Bytes) of @@ -468,12 +504,10 @@ do_read(FHd, Filename, TaggedCsum, Offset, Size) -> TaggedCsum -> {ok, Bytes, TaggedCsum} end; - {ok, Partial} -> lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p", [Filename, Offset, Size, byte_size(Partial)]), {error, partial_read}; - Other -> lager:error("While reading file ~p, offset ~p, length ~p, got ~p", [Filename, Offset, Size, Other]), @@ -485,29 +519,41 @@ do_read(FHd, Filename, TaggedCsum, Offset, Size) -> Filename :: string(), TaggedCsum :: binary(), Offset :: non_neg_integer(), - Data :: binary() ) -> ok | - {error, written} | - {error, Reason :: term()}. -handle_write(FHd, FHc, Filename, TaggedCsum, Offset, Data) -> + Data :: binary(), + Unwritten :: [byte_sequence()] + ) -> {ok, NewU :: [byte_sequence()]} | + {error, written} | + {error, Reason :: term()}. +handle_write(FHd, FHc, Filename, TaggedCsum, Offset, Data, U) -> Size = iolist_size(Data), - case do_read(FHd, Filename, TaggedCsum, Offset, Size) of - eof -> + + case is_byte_range_unwritten(Offset, Size, U) of + false -> + case get({Offset, Size}) of + undefined -> + {error, written}; + TaggedCsum -> + case do_read(FHd, Filename, TaggedCsum, Offset, Size) of + eof -> + lager:warning("This should never happen: got eof while reading at offset ~p in file ~p that's supposedly written", + [Offset, Filename]), + {error, server_insanity}; + {ok, _, _} -> + {ok, U}; + _ -> + {error, written} + end + end; + true -> try - do_write(FHd, FHc, Filename, TaggedCsum, Offset, Size, Data) + do_write(FHd, FHc, Filename, TaggedCsum, Offset, Size, Data, U) catch %%% XXX FIXME: be more specific on badmatch that might - %%% occur around line 520 when we write the checksum + %%% occur around line 533 when we write the checksum %%% file entry for the data blob we just put on the disk error:Reason -> {error, Reason} - end; - {ok, _, _} -> - % yep, we did that write! Honest. - ok; - {error, Error} -> - lager:error("During write to ~p, offset ~p, got error ~p; returning {error, written}", - [Filename, Offset, Error]), - {error, written} + end end. -spec do_write( FHd :: file:descriptor(), @@ -516,18 +562,118 @@ handle_write(FHd, FHc, Filename, TaggedCsum, Offset, Data) -> TaggedCsum :: binary(), Offset :: non_neg_integer(), Size :: non_neg_integer(), - Data :: binary() ) -> ok|term(). -do_write(FHd, FHc, Filename, TaggedCsum, Offset, Size, Data) -> + Data :: binary(), + Unwritten :: [byte_sequence()] + ) -> {ok, NewUnwritten :: [byte_sequence()]} | + {error, Reason :: term()}. +do_write(FHd, FHc, Filename, TaggedCsum, Offset, Size, Data, U) -> case file:pwrite(FHd, Offset, Data) of ok -> lager:debug("Successful write in file ~p at offset ~p, length ~p", [Filename, Offset, Size]), EncodedCsum = encode_csum_file_entry(Offset, Size, TaggedCsum), ok = file:write(FHc, EncodedCsum), - lager:debug("Successful write to checksum file for ~p.", [Filename]), - ok; + put({Offset, Size}, TaggedCsum), + NewU = update_unwritten(Offset, Size, U), + lager:debug("Successful write to checksum file for ~p; unwritten bytes are now: ~p", + [Filename, NewU]), + {ok, NewU}; Other -> lager:error("Got ~p during write to file ~p at offset ~p, length ~p", [Other, Filename, Offset, Size]), {error, Other} end. + +-spec is_byte_range_unwritten( Offset :: non_neg_integer(), + Size :: pos_integer(), + Unwritten :: [byte_sequence()] ) -> boolean(). +% @private Given an offset and a size, return `true' if a byte range has +% not been written. Otherwise, return `false'. +is_byte_range_unwritten(Offset, Size, Unwritten) -> + case length(Unwritten) of + 0 -> + lager:critical("Unwritten byte list has 0 entries! This should never happen."), + false; + 1 -> + {Eof, infinity} = hd(Unwritten), + Offset >= Eof; + _ -> + case lookup_unwritten(Offset, Size, Unwritten) of + {ok, _} -> true; + not_found -> false + end + end. + +-spec lookup_unwritten( Offset :: non_neg_integer(), + Size :: pos_integer(), + Unwritten :: [byte_sequence()] + ) -> {ok, byte_sequence()} | not_found. +% @private Given an offset and a size, scan the list of unwritten bytes and +% look for a "hole" where a write might be allowed if any exist. If a +% suitable byte sequence is found, the function returns a tuple of {ok, +% {Position, Space}} is returned. `not_found' is returned if no suitable +% space is located. +lookup_unwritten(_Offset, _Size, []) -> + not_found; +lookup_unwritten(Offset, _Size, [H={Pos, infinity}|_Rest]) when Offset >= Pos -> + {ok, H}; +lookup_unwritten(Offset, Size, [H={Pos, Space}|_Rest]) + when Offset >= Pos andalso Offset < Pos+Space + andalso Size =< (Space - (Offset - Pos)) -> + {ok, H}; +lookup_unwritten(Offset, Size, [_H|Rest]) -> + lookup_unwritten(Offset, Size, Rest). + +-spec update_unwritten( Offset :: non_neg_integer(), + Size :: pos_integer(), + Unwritten :: [byte_sequence()] ) -> NewUnwritten :: [byte_sequence()]. +% @private Given an offset, a size and the unwritten byte list, return an updated +% and sorted unwritten byte list accounting for any completed write operation. +update_unwritten(Offset, Size, Unwritten) -> + case lookup_unwritten(Offset, Size, Unwritten) of + not_found -> + lager:error("Couldn't find byte sequence tuple for a write which earlier found a valid spot to write!!! This should never happen!"), + Unwritten; + {ok, {Offset, Size}} -> + %% we neatly filled in our hole... + lists:keydelete(Offset, 1, Unwritten); + {ok, S={Pos, _}} -> + lists:sort(lists:keydelete(Pos, 1, Unwritten) ++ + update_byte_range(Offset, Size, S)) + end. + +-spec update_byte_range( Offset :: non_neg_integer(), + Size :: pos_integer(), + Sequence :: byte_sequence() ) -> Updates :: [byte_sequence()]. +% @private Given an offset and size and a byte sequence tuple where a +% write took place, return a list of updates to the list of unwritten bytes +% accounting for the space occupied by the just completed write. +update_byte_range(Offset, Size, {Eof, infinity}) when Offset == Eof -> + [{Offset + Size, infinity}]; +update_byte_range(Offset, Size, {Eof, infinity}) when Offset > Eof -> + [{Eof, (Offset - Eof)}, {Offset+Size, infinity}]; +update_byte_range(Offset, Size, {Pos, Space}) when Offset == Pos andalso Size < Space -> + [{Offset + Size, Space - Size}]; +update_byte_range(Offset, Size, {Pos, Space}) when Offset > Pos -> + [{Pos, Offset - Pos}, {Offset+Size, ( (Pos+Space) - (Offset + Size) )}]. + + +-spec build_unwritten_bytes_list( CsumData :: [{ Offset :: non_neg_integer(), + Size :: pos_integer(), + Checksum :: binary() }], + LastOffset :: non_neg_integer(), + Acc :: list() ) -> [byte_sequence()]. +% @private Given a sorted list of checksum data tuples, return a sorted +% list of unwritten byte ranges. The output list always has at least one +% entry: the last tuple in the list is guaranteed to be the current end of +% bytes written to a particular file with the special space moniker +% `infinity'. +build_unwritten_bytes_list([], Last, Acc) -> + NewAcc = [ {Last, infinity} | Acc ], + lists:reverse(NewAcc); +build_unwritten_bytes_list([{CurrentOffset, CurrentSize, _Csum}|Rest], LastOffset, Acc) when + CurrentOffset /= LastOffset -> + Hole = CurrentOffset - LastOffset, + build_unwritten_bytes_list(Rest, (CurrentOffset+CurrentSize), [{LastOffset, Hole}|Acc]); +build_unwritten_bytes_list([{CO, CS, _Ck}|Rest], _LastOffset, Acc) -> + build_unwritten_bytes_list(Rest, CO + CS, Acc).