diff --git a/.gitignore b/.gitignore index 5243bad..30f2cd7 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,8 @@ erl_crash.dump .concrete/DEV_MODE .rebar edoc +# ignore vim swap files +*.swp # PB artifacts for Erlang include/machi_pb.hrl diff --git a/include/machi.hrl b/include/machi.hrl index 4421344..ae6c554 100644 --- a/include/machi.hrl +++ b/include/machi.hrl @@ -23,6 +23,7 @@ %% -define(DATA_DIR, "/Volumes/SAM1/seq-tests/data"). -define(DATA_DIR, "./data"). -define(MINIMUM_OFFSET, 1024). +-define(FN_DELIMITER, "^"). %% 0th draft of checksum typing with 1st byte. -define(CSUM_TAG_NONE, 0). % No csum provided by client diff --git a/rebar.config b/rebar.config index bb37270..50b68c9 100644 --- a/rebar.config +++ b/rebar.config @@ -6,6 +6,7 @@ {deps, [ {lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "2.0.1"}}}, - {protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", {tag, "0.8.1p4"}}} + {protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", {tag, "0.8.1p4"}}}, + {ranch, ".*", {git, "git://github.com/ninenines/ranch.git", {tag, "1.1.0"}}} ]}. diff --git a/src/machi_file_proxy.erl b/src/machi_file_proxy.erl new file mode 100644 index 0000000..6fb6b35 --- /dev/null +++ b/src/machi_file_proxy.erl @@ -0,0 +1,778 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc This is a proxy process which mediates access to Machi FLU +%% controlled files. In particular, it manages the "write-once register" +%% conceit at the heart of Machi's design. +%% +%% Read, write and append requests for a single file will be managed +%% through this proxy. Clients can also request syncs for specific +%% types of filehandles. +%% +%% As operations are requested, the proxy keeps track of how many +%% operations it has performed (and how many errors were generated.) +%% After a sufficient number of inactivity, the server terminates +%% itself. +%% +%% TODO: +%% 1. Some way to transition the proxy into a wedged state that +%% doesn't rely on message delivery. +%% +%% 2. Check max file size on appends. Writes we take on faith we can +%% and should handle. +%% +%% 3. Async checksum reads on startup. + +-module(machi_file_proxy). +-behaviour(gen_server). + +-include("machi.hrl"). + +%% public API +-export([ + start_link/2, + stop/1, + sync/1, + sync/2, + read/3, + write/3, + write/4, + append/2, + append/4 +]). + +%% gen_server callbacks +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +-define(TICK, 30*1000). %% XXX FIXME Should be something like 5 seconds +-define(TICK_THRESHOLD, 5). %% After this + 1 more quiescent ticks, shutdown +-define(TIMEOUT, 10*1000). +-define(TOO_MANY_ERRORS_RATIO, 50). + +-type op_stats() :: { Total :: non_neg_integer(), + Errors :: non_neg_integer() }. + +-type byte_sequence() :: { Offset :: non_neg_integer(), + Size :: pos_integer()|infinity }. + +-record(state, { + data_dir :: string() | undefined, + filename :: string() | undefined, + data_path :: string() | undefined, + wedged = false :: boolean(), + csum_file :: string()|undefined, + csum_path :: string()|undefined, + eof_position = 0 :: non_neg_integer(), + unwritten_bytes = [] :: [byte_sequence()], + data_filehandle :: file:filehandle(), + csum_filehandle :: file:filehandle(), + tref :: reference(), %% timer ref + ticks = 0 :: non_neg_integer(), %% ticks elapsed with no new operations + ops = 0 :: non_neg_integer(), %% sum of all ops + reads = {0, 0} :: op_stats(), + writes = {0, 0} :: op_stats(), + appends = {0, 0} :: op_stats() +}). + +%% Public API + +% @doc Start a new instance of the file proxy service. Takes the filename +% and data directory as arguments. This function is typically called by the +% `machi_file_proxy_sup:start_proxy/2' function. +-spec start_link(Filename :: string(), DataDir :: string()) -> any(). +start_link(Filename, DataDir) -> + gen_server:start_link(?MODULE, {Filename, DataDir}, []). + +% @doc Request to stop an instance of the file proxy service. +-spec stop(Pid :: pid()) -> ok. +stop(Pid) when is_pid(Pid) -> + gen_server:call(Pid, {stop}, ?TIMEOUT). + +% @doc Force a sync of all filehandles +-spec sync(Pid :: pid()) -> ok|{error, term()}. +sync(Pid) when is_pid(Pid) -> + sync(Pid, all); +sync(_Pid) -> + lager:warning("Bad pid to sync"), + {error, bad_arg}. + +% @doc Force a sync of a specific filehandle type. Valid types are `all', `csum' and `data'. +-spec sync(Pid :: pid(), Type :: all|data|csum) -> ok|{error, term()}. +sync(Pid, Type) when is_pid(Pid) andalso + ( Type =:= all orelse Type =:= csum orelse Type =:= data ) -> + gen_server:call(Pid, {sync, Type}, ?TIMEOUT); +sync(_Pid, Type) -> + lager:warning("Bad arg to sync: Type ~p", [Type]), + {error, bad_arg}. + +% @doc Read file at offset for length +-spec read(Pid :: pid(), + Offset :: non_neg_integer(), + Length :: non_neg_integer()) -> {ok, Data :: binary(), Checksum :: binary()} | + {error, Reason :: term()}. +read(Pid, Offset, Length) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0 + andalso is_integer(Length) andalso Length > 0 -> + gen_server:call(Pid, {read, Offset, Length}, ?TIMEOUT); +read(_Pid, Offset, Length) -> + lager:warning("Bad args to read: Offset ~p, Length ~p", [Offset, Length]), + {error, bad_arg}. + +% @doc Write data at offset +-spec write(Pid :: pid(), Offset :: non_neg_integer(), Data :: binary()) -> ok|{error, term()}. +write(Pid, Offset, Data) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0 + andalso is_binary(Data) -> + write(Pid, Offset, [], Data); +write(_Pid, Offset, _Data) -> + lager:warning("Bad arg to write: Offset ~p", [Offset]), + {error, bad_arg}. + +% @doc Write data at offset, including the client metadata. ClientMeta is a proplist +% that expects the following keys and values: +% +-spec write(Pid :: pid(), Offset :: non_neg_integer(), ClientMeta :: proplists:proplist(), + Data :: binary()) -> ok|{error, term()}. +write(Pid, Offset, ClientMeta, Data) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0 + andalso is_list(ClientMeta) andalso is_binary(Data) -> + gen_server:call(Pid, {write, Offset, ClientMeta, Data}, ?TIMEOUT); +write(_Pid, Offset, ClientMeta, _Data) -> + lager:warning("Bad arg to write: Offset ~p, ClientMeta: ~p", [Offset, ClientMeta]), + {error, bad_arg}. + +% @doc Append data +-spec append(Pid :: pid(), Data :: binary()) -> {ok, File :: string(), Offset :: non_neg_integer()} + |{error, term()}. +append(Pid, Data) when is_pid(Pid) andalso is_binary(Data) -> + append(Pid, [], 0, Data); +append(_Pid, _Data) -> + lager:warning("Bad arguments to append/2"), + {error, bad_arg}. + +% @doc Append data to file, supplying client metadata and (if desired) a +% reservation for additional space. ClientMeta is a proplist and expects the +% same keys as write/4. +-spec append(Pid :: pid(), ClientMeta :: proplists:proplist(), + Extra :: non_neg_integer(), Data :: binary()) -> {ok, File :: string(), Offset :: non_neg_integer()} + |{error, term()}. +append(Pid, ClientMeta, Extra, Data) when is_pid(Pid) andalso is_list(ClientMeta) + andalso is_integer(Extra) andalso Extra >= 0 + andalso is_binary(Data) -> + gen_server:call(Pid, {append, ClientMeta, Extra, Data}, ?TIMEOUT); +append(_Pid, ClientMeta, Extra, _Data) -> + lager:warning("Bad arg to append: ClientMeta ~p, Extra ~p", [ClientMeta, Extra]), + {error, bad_arg}. + +%% gen_server callbacks + +% @private +init({Filename, DataDir}) -> + CsumFile = machi_util:make_checksum_filename(DataDir, Filename), + {_, DPath} = machi_util:make_data_filename(DataDir, Filename), + ok = filelib:ensure_dir(CsumFile), + ok = filelib:ensure_dir(DPath), + UnwrittenBytes = parse_csum_file(CsumFile), + {Eof, infinity} = lists:last(UnwrittenBytes), + {ok, FHd} = file:open(DPath, [read, write, binary, raw]), + {ok, FHc} = file:open(CsumFile, [append, binary, raw]), + Tref = schedule_tick(), + St = #state{ + filename = Filename, + data_dir = DataDir, + data_path = DPath, + csum_file = CsumFile, + data_filehandle = FHd, + csum_filehandle = FHc, + tref = Tref, + unwritten_bytes = UnwrittenBytes, + eof_position = Eof}, + lager:debug("Starting file proxy ~p for filename ~p, state = ~p", + [self(), Filename, St]), + {ok, St}. + +% @private +handle_call({stop}, _From, State) -> + lager:debug("Requested to stop."), + {stop, normal, State}; + +handle_call({sync, data}, _From, State = #state{ data_filehandle = FHd }) -> + R = file:sync(FHd), + {reply, R, State}; + +handle_call({sync, csum}, _From, State = #state{ csum_filehandle = FHc }) -> + R = file:sync(FHc), + {reply, R, State}; + +handle_call({sync, all}, _From, State = #state{filename = F, + data_filehandle = FHd, + csum_filehandle = FHc + }) -> + R = file:sync(FHc), + R1 = file:sync(FHd), + Resp = case {R, R1} of + {ok, ok} -> ok; + {ok, O1} -> + lager:error("Got ~p during a data file sync on file ~p", [O1, F]), + O1; + {O2, ok} -> + lager:error("Got ~p during a csum file sync on file ~p", [O2, F]), + O2; + {O3, O4} -> + lager:error("Got ~p ~p syncing all files for file ~p", [O3, O4, F]), + {O3, O4} + end, + {reply, Resp, State}; + +%%% READS + +handle_call({read, _Offset, _Length}, _From, + State = #state{wedged = true, + reads = {T, Err} + }) -> + {reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}}; + +handle_call({read, Offset, Length}, _From, + State = #state{eof_position = Eof, + reads = {T, Err} + }) when Offset + Length > Eof -> + lager:error("Read request at offset ~p for ~p bytes is past the last write offset of ~p", + [Offset, Length, Eof]), + {reply, {error, not_written}, State#state{reads = {T + 1, Err + 1}}}; + +handle_call({read, Offset, Length}, _From, + State = #state{filename = F, + data_filehandle = FH, + unwritten_bytes = U, + reads = {T, Err} + }) -> + + Checksum = get({Offset, Length}), %% N.B. Maybe be 'undefined'! + + {Resp, NewErr} = case handle_read(FH, F, Checksum, Offset, Length, U) of + {ok, Bytes, Csum} -> + {{ok, Bytes, Csum}, Err}; + eof -> + {{error, not_written}, Err + 1}; + Error -> + {Error, Err + 1} + end, + {reply, Resp, State#state{reads = {T+1, NewErr}}}; + +%%% WRITES + +handle_call({write, _Offset, _ClientMeta, _Data}, _From, + State = #state{wedged = true, + writes = {T, Err} + }) -> + {reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}}; + +handle_call({write, Offset, ClientMeta, Data}, _From, + State = #state{unwritten_bytes = U, + filename = F, + writes = {T, Err}, + data_filehandle = FHd, + csum_filehandle = FHc + }) -> + + ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE), + ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>), + + {Resp, NewErr, NewU} = + case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of + {error, {bad_csum, Bad}} -> + lager:error("Bad checksum on write; client sent ~p, we computed ~p", + [ClientCsum, Bad]), + {{error, bad_csum}, Err + 1, U}; + TaggedCsum -> + case handle_write(FHd, FHc, F, TaggedCsum, Offset, Data, U) of + {ok, NewU1} -> + {ok, Err, NewU1}; + Error -> + {Error, Err + 1, U} + end + end, + {NewEof, infinity} = lists:last(NewU), + {reply, Resp, State#state{writes = {T+1, NewErr}, + eof_position = NewEof, + unwritten_bytes = NewU + }}; + +%% APPENDS + +handle_call({append, _ClientMeta, _Extra, _Data}, _From, + State = #state{wedged = true, + appends = {T, Err} + }) -> + {reply, {error, wedged}, State#state{appends = {T+1, Err+1}}}; + +handle_call({append, ClientMeta, Extra, Data}, _From, + State = #state{eof_position = EofP, + unwritten_bytes = U, + filename = F, + appends = {T, Err}, + data_filehandle = FHd, + csum_filehandle = FHc + }) -> + + ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE), + ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>), + + {Resp, NewErr, NewU} = + case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of + {error, {bad_csum, Bad}} -> + lager:error("Bad checksum; client sent ~p, we computed ~p", + [ClientCsum, Bad]), + {{error, bad_csum}, Err + 1, U}; + TaggedCsum -> + case handle_write(FHd, FHc, F, TaggedCsum, EofP, Data, U) of + {ok, NewU1} -> + {{ok, F, EofP}, Err, NewU1}; + Error -> + {Error, Err + 1, EofP, U} + end + end, + {NewEof, infinity} = lists:last(NewU), + {reply, Resp, State#state{appends = {T+1, NewErr}, + eof_position = NewEof + Extra, + unwritten_bytes = NewU + }}; + +handle_call(Req, _From, State) -> + lager:warning("Unknown call: ~p", [Req]), + {reply, whoaaaaaaaaaaaa, State}. + +% @private +handle_cast(Cast, State) -> + lager:warning("Unknown cast: ~p", [Cast]), + {noreply, State}. + +% @private +handle_info(tick, State = #state{eof_position = Eof}) when Eof >= ?MAX_FILE_SIZE -> + lager:notice("Eof position ~p >= max file size ~p. Shutting down.", + [Eof, ?MAX_FILE_SIZE]), + {stop, file_rollover, State}; + +%% XXX Is this a good idea? Need to think this through a bit. +handle_info(tick, State = #state{wedged = true}) -> + {stop, wedged, State}; + +%% I dunno. This may not be a good idea, but it seems like if we're throwing lots of +%% errors, we ought to shut down and give up our file descriptors. +handle_info(tick, State = #state{ + ops = Ops, + reads = {RT, RE}, + writes = {WT, WE}, + appends = {AT, AE} + }) when Ops > 100 andalso + trunc(((RE+WE+AE) / RT+WT+AT) * 100) > ?TOO_MANY_ERRORS_RATIO -> + Errors = RE + WE + AE, + lager:notice("Got ~p errors. Shutting down.", [Errors]), + {stop, too_many_errors, State}; + +handle_info(tick, State = #state{ + ticks = Ticks, + ops = Ops, + reads = {RT, _RE}, + writes = {WT, _WE}, + appends = {AT, _AE}}) when Ops == RT + WT + AT, Ticks == ?TICK_THRESHOLD -> + lager:debug("Got 5 ticks with no new activity. Shutting down."), + {stop, normal, State}; + +handle_info(tick, State = #state{ + ticks = Ticks, + ops = Ops, + reads = {RT, _RE}, + writes = {WT, _WE}, + appends = {AT, _AE}}) when Ops == RT + WT + AT -> + lager:debug("No new activity since last tick. Incrementing tick counter."), + Tref = schedule_tick(), + {noreply, State#state{tref = Tref, ticks = Ticks + 1}}; + +handle_info(tick, State = #state{ + reads = {RT, _RE}, + writes = {WT, _WE}, + appends = {AT, _AE} + }) -> + Ops = RT + WT + AT, + lager:debug("Setting ops counter to ~p", [Ops]), + Tref = schedule_tick(), + {noreply, State#state{tref = Tref, ops = Ops}}; + +%handle_info({wedged, EpochId} State = #state{epoch = E}) when E /= EpochId -> +% lager:notice("Wedge epoch ~p but ignoring because our epoch id is ~p", [EpochId, E]), +% {noreply, State}; + +%handle_info({wedged, EpochId}, State = #state{epoch = E}) when E == EpochId -> +% lager:notice("Wedge epoch ~p same as our epoch id ~p; we are wedged. Bummer.", [EpochId, E]), +% {noreply, State#state{wedged = true}}; + +% flu1.erl: +% ProxyPid = get_proxy_pid(Filename), +% Are we wedged? if not +% machi_file_proxy:read(Pid, Offset, Length) +% otherwise -> error,wedged +% +% get_proxy_pid(Filename) -> +% Pid = lookup_pid(Filename) +% is_pid_alive(Pid) +% Pid +% if not alive then start one + +handle_info(Req, State) -> + lager:warning("Unknown info message: ~p", [Req]), + {noreply, State}. + +% @private +terminate(Reason, #state{filename = F, + data_filehandle = FHd, + csum_filehandle = FHc, + reads = {RT, RE}, + writes = {WT, WE}, + appends = {AT, AE} + }) -> + lager:info("Shutting down proxy for file ~p because ~p", [F, Reason]), + lager:info(" Op Tot/Error", []), + lager:info(" Reads: ~p/~p", [RT, RE]), + lager:info(" Writes: ~p/~p", [WT, WE]), + lager:info("Appends: ~p/~p", [AT, AE]), + ok = file:sync(FHd), + ok = file:sync(FHc), + ok = file:close(FHd), + ok = file:close(FHc), + ok. + +% @private +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% Private functions + +-spec schedule_tick() -> reference(). +schedule_tick() -> + erlang:send_after(?TICK, self(), tick). + +-spec check_or_make_tagged_csum(Type :: binary(), + Checksum :: binary(), + Data :: binary() ) -> binary() | + {error, {bad_csum, Bad :: binary()}}. +check_or_make_tagged_csum(?CSUM_TAG_NONE, _Csum, Data) -> + %% We are making a checksum here + Csum = machi_util:checksum_chunk(Data), + machi_util:make_tagged_csum(server_sha, Csum); +check_or_make_tagged_csum(Tag, InCsum, Data) when Tag == ?CSUM_TAG_CLIENT_SHA; + Tag == ?CSUM_TAG_SERVER_SHA -> + Csum = machi_util:checksum_chunk(Data), + case Csum =:= InCsum of + true -> + machi_util:make_tagged_csum(server_sha, Csum); + false -> + {error, {bad_csum, Csum}} + end; +check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) -> + lager:warning("Unknown checksum tag ~p", [OtherTag]), + {error, bad_csum}. + +encode_csum_file_entry(Offset, Size, TaggedCSum) -> + Len = 8 + 4 + byte_size(TaggedCSum), + [<>, + TaggedCSum]. + +map_offsets_to_csums(CsumList) -> + lists:foreach(fun insert_offsets/1, CsumList). + +insert_offsets({Offset, Length, Checksum}) -> + put({Offset, Length}, Checksum). + +-spec parse_csum_file( Filename :: string() ) -> [byte_sequence()]. +parse_csum_file(Filename) -> + %% using file:read_file works as long as the files are "small" + try + {ok, CsumData} = file:read_file(Filename), + {DecodedCsums, _Junk} = machi_flu1:split_checksum_list_blob_decode(CsumData), + Sort = lists:sort(DecodedCsums), + case Sort of + [] -> [{?MINIMUM_OFFSET, infinity}]; + _ -> + map_offsets_to_csums(DecodedCsums), + {First, _, _} = hd(Sort), + build_unwritten_bytes_list(Sort, First, []) + end + catch + _:{badmatch, {error, enoent}} -> + [{?MINIMUM_OFFSET, infinity}] + end. + +-spec handle_read(FHd :: file:filehandle(), + Filename :: string(), + TaggedCsum :: undefined|binary(), + Offset :: non_neg_integer(), + Size :: non_neg_integer(), + Unwritten :: [byte_sequence()] + ) -> {ok, Bytes :: binary(), Csum :: binary()} | + eof | + {error, bad_csum} | + {error, partial_read} | + {error, not_written} | + {error, Other :: term() }. +% @private Attempt a read operation on the given offset and length. +%
  • +% +% +% +% +% +%
  • +% +% On success, `{ok, Bytes, Checksum}' is returned. +handle_read(FHd, Filename, undefined, Offset, Size, U) -> + handle_read(FHd, Filename, machi_util:make_tagged_csum(none), Offset, Size, U); + +handle_read(FHd, Filename, TaggedCsum, Offset, Size, U) -> + case is_byte_range_unwritten(Offset, Size, U) of + true -> + {error, not_written}; + false -> + do_read(FHd, Filename, TaggedCsum, Offset, Size) + end. + +do_read(FHd, Filename, TaggedCsum, Offset, Size) -> + case file:pread(FHd, Offset, Size) of + eof -> + eof; + {ok, Bytes} when byte_size(Bytes) == Size -> + {Tag, Ck} = machi_util:unmake_tagged_csum(TaggedCsum), + case check_or_make_tagged_csum(Tag, Ck, Bytes) of + {error, Bad} -> + lager:error("Bad checksum; got ~p, expected ~p", + [Bad, Ck]), + {error, bad_csum}; + TaggedCsum -> + {ok, Bytes, TaggedCsum}; + %% XXX FIXME: Should we return something other than + %% {ok, ....} in this case? + OtherCsum when Tag =:= ?CSUM_TAG_NONE -> + {ok, Bytes, OtherCsum} + end; + {ok, Partial} -> + lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p", + [Filename, Offset, Size, byte_size(Partial)]), + {error, partial_read}; + Other -> + lager:error("While reading file ~p, offset ~p, length ~p, got ~p", + [Filename, Offset, Size, Other]), + {error, Other} + end. + +-spec handle_write( FHd :: file:filehandle(), + FHc :: file:filehandle(), + Filename :: string(), + TaggedCsum :: binary(), + Offset :: non_neg_integer(), + Data :: binary(), + Unwritten :: [byte_sequence()] + ) -> {ok, NewU :: [byte_sequence()]} | + {error, written} | + {error, Reason :: term()}. +% @private Implements the write and append operation. The first task is to +% determine if the offset and data size has been written. If not, the write +% is allowed proceed. A special case is made when an offset and data size +% match a checksum. In that case we read the data off the disk, validate the +% checksum and return a "fake" ok response as if the write had been performed +% when it hasn't really. +% +% If a write proceeds, the offset, size and checksum are written to a metadata +% file, and the internal list of unwritten bytes is modified to reflect the +% just-performed write. This is then returned to the caller as +% `{ok, NewUnwritten}' where NewUnwritten is the revised unwritten byte list. +handle_write(FHd, FHc, Filename, TaggedCsum, Offset, Data, U) -> + Size = iolist_size(Data), + + case is_byte_range_unwritten(Offset, Size, U) of + false -> + case get({Offset, Size}) of + undefined -> + {error, written}; + TaggedCsum -> + case do_read(FHd, Filename, TaggedCsum, Offset, Size) of + eof -> + lager:warning("This should never happen: got eof while reading at offset ~p in file ~p that's supposedly written", + [Offset, Filename]), + {error, server_insanity}; + {ok, _, _} -> + {ok, U}; + _ -> + {error, written} + end; + OtherCsum -> + %% Got a checksum, but it doesn't match the data block's + lager:error("During a potential write at offset ~p in file ~p, a check for unwritten bytes gave us checksum ~p but the data we were trying to trying to write has checksum ~p", + [Offset, Filename, OtherCsum, TaggedCsum]), + {error, written} + end; + true -> + try + do_write(FHd, FHc, Filename, TaggedCsum, Offset, Size, Data, U) + catch + %%% XXX FIXME: be more specific on badmatch that might + %%% occur around line 593 when we write the checksum + %%% file entry for the data blob we just put on the disk + error:Reason -> + {error, Reason} + end + end. + +% @private Implements the disk writes for both the write and append +% operation. +-spec do_write( FHd :: file:descriptor(), + FHc :: file:descriptor(), + Filename :: string(), + TaggedCsum :: binary(), + Offset :: non_neg_integer(), + Size :: non_neg_integer(), + Data :: binary(), + Unwritten :: [byte_sequence()] + ) -> {ok, NewUnwritten :: [byte_sequence()]} | + {error, Reason :: term()}. +do_write(FHd, FHc, Filename, TaggedCsum, Offset, Size, Data, U) -> + case file:pwrite(FHd, Offset, Data) of + ok -> + lager:debug("Successful write in file ~p at offset ~p, length ~p", + [Filename, Offset, Size]), + EncodedCsum = encode_csum_file_entry(Offset, Size, TaggedCsum), + ok = file:write(FHc, EncodedCsum), + put({Offset, Size}, TaggedCsum), + NewU = update_unwritten(Offset, Size, U), + lager:debug("Successful write to checksum file for ~p; unwritten bytes are now: ~p", + [Filename, NewU]), + {ok, NewU}; + Other -> + lager:error("Got ~p during write to file ~p at offset ~p, length ~p", + [Other, Filename, Offset, Size]), + {error, Other} + end. + +-spec is_byte_range_unwritten( Offset :: non_neg_integer(), + Size :: pos_integer(), + Unwritten :: [byte_sequence()] ) -> boolean(). +% @private Given an offset and a size, return `true' if a byte range has +% not been written. Otherwise, return `false'. +is_byte_range_unwritten(Offset, Size, Unwritten) -> + case length(Unwritten) of + 0 -> + lager:critical("Unwritten byte list has 0 entries! This should never happen."), + false; + 1 -> + {Eof, infinity} = hd(Unwritten), + Offset >= Eof; + _ -> + case lookup_unwritten(Offset, Size, Unwritten) of + {ok, _} -> true; + not_found -> false + end + end. + +-spec lookup_unwritten( Offset :: non_neg_integer(), + Size :: pos_integer(), + Unwritten :: [byte_sequence()] + ) -> {ok, byte_sequence()} | not_found. +% @private Given an offset and a size, scan the list of unwritten bytes and +% look for a "hole" where a write might be allowed if any exist. If a +% suitable byte sequence is found, the function returns a tuple of {ok, +% {Position, Space}} is returned. `not_found' is returned if no suitable +% space is located. +lookup_unwritten(_Offset, _Size, []) -> + not_found; +lookup_unwritten(Offset, _Size, [H={Pos, infinity}|_Rest]) when Offset >= Pos -> + {ok, H}; +lookup_unwritten(Offset, Size, [H={Pos, Space}|_Rest]) + when Offset >= Pos andalso Offset < Pos+Space + andalso Size =< (Space - (Offset - Pos)) -> + {ok, H}; +lookup_unwritten(Offset, Size, [_H|Rest]) -> + %% These are not the droids you're looking for. + lookup_unwritten(Offset, Size, Rest). + +%%% if the pos is greater than offset + size then we're done. End early. + +-spec update_unwritten( Offset :: non_neg_integer(), + Size :: pos_integer(), + Unwritten :: [byte_sequence()] ) -> NewUnwritten :: [byte_sequence()]. +% @private Given an offset, a size and the unwritten byte list, return an updated +% and sorted unwritten byte list accounting for any completed write operation. +update_unwritten(Offset, Size, Unwritten) -> + case lookup_unwritten(Offset, Size, Unwritten) of + not_found -> + lager:error("Couldn't find byte sequence tuple for a write which earlier found a valid spot to write!!! This should never happen!"), + Unwritten; + {ok, {Offset, Size}} -> + %% we neatly filled in our hole... + lists:keydelete(Offset, 1, Unwritten); + {ok, S={Pos, _}} -> + lists:sort(lists:keydelete(Pos, 1, Unwritten) ++ + update_byte_range(Offset, Size, S)) + end. + +-spec update_byte_range( Offset :: non_neg_integer(), + Size :: pos_integer(), + Sequence :: byte_sequence() ) -> Updates :: [byte_sequence()]. +% @private Given an offset and size and a byte sequence tuple where a +% write took place, return a list of updates to the list of unwritten bytes +% accounting for the space occupied by the just completed write. +update_byte_range(Offset, Size, {Eof, infinity}) when Offset == Eof -> + [{Offset + Size, infinity}]; +update_byte_range(Offset, Size, {Eof, infinity}) when Offset > Eof -> + [{Eof, (Offset - Eof)}, {Offset+Size, infinity}]; +update_byte_range(Offset, Size, {Pos, Space}) when Offset == Pos andalso Size < Space -> + [{Offset + Size, Space - Size}]; +update_byte_range(Offset, Size, {Pos, Space}) when Offset > Pos -> + [{Pos, Offset - Pos}, {Offset+Size, ( (Pos+Space) - (Offset + Size) )}]. + + +-spec build_unwritten_bytes_list( CsumData :: [{ Offset :: non_neg_integer(), + Size :: pos_integer(), + Checksum :: binary() }], + LastOffset :: non_neg_integer(), + Acc :: list() ) -> [byte_sequence()]. +% @private Given a sorted list of checksum data tuples, return a sorted +% list of unwritten byte ranges. The output list always has at least one +% entry: the last tuple in the list is guaranteed to be the current end of +% bytes written to a particular file with the special space moniker +% `infinity'. +build_unwritten_bytes_list([], Last, Acc) -> + NewAcc = [ {Last, infinity} | Acc ], + lists:reverse(NewAcc); +build_unwritten_bytes_list([{CurrentOffset, CurrentSize, _Csum}|Rest], LastOffset, Acc) when + CurrentOffset /= LastOffset -> + Hole = CurrentOffset - LastOffset, + build_unwritten_bytes_list(Rest, (CurrentOffset+CurrentSize), [{LastOffset, Hole}|Acc]); +build_unwritten_bytes_list([{CO, CS, _Ck}|Rest], _LastOffset, Acc) -> + build_unwritten_bytes_list(Rest, CO + CS, Acc). diff --git a/src/machi_file_proxy_sup.erl b/src/machi_file_proxy_sup.erl new file mode 100644 index 0000000..bd5cec2 --- /dev/null +++ b/src/machi_file_proxy_sup.erl @@ -0,0 +1,46 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc This is the main supervisor for the file proxies. +-module(machi_file_proxy_sup). +-behaviour(supervisor). + +%% public API +-export([ + start_link/0, + start_proxy/2 +]). + +%% supervisor callback +-export([ + init/1 +]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +start_proxy(Filename, DataDir) -> + supervisor:start_child(?MODULE, [Filename, DataDir]). + +init([]) -> + SupFlags = {simple_one_for_one, 1000, 10}, + ChildSpec = {unused, {machi_file_proxy, start_link, []}, + temporary, 2000, worker, [machi_file_proxy]}, + {ok, {SupFlags, [ChildSpec]}}. diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 04e1022..06c76a7 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -64,1158 +64,579 @@ %% replication/chain repair. -module(machi_flu1). - --include_lib("kernel/include/file.hrl"). - --include("machi.hrl"). --include("machi_pb.hrl"). --include("machi_projection.hrl"). --define(V(X,Y), ok). -%% -include("machi_verbose.hrl"). - --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). --endif. % TEST - --define(SERVER_CMD_READ_TIMEOUT, 600*1000). - --export([start_link/1, stop/1, - update_wedge_state/3, wedge_myself/2]). --export([make_listener_regname/1, make_projection_server_regname/1]). --export([encode_csum_file_entry/3, encode_csum_file_entry_bin/3, - decode_csum_file_entry/1, - split_checksum_list_blob/1, split_checksum_list_blob_decode/1]). - --record(state, { - flu_name :: atom(), - proj_store :: pid(), - append_pid :: pid(), - tcp_port :: non_neg_integer(), - data_dir :: string(), - wedged = true :: boolean(), - etstab :: ets:tid(), - epoch_id :: 'undefined' | machi_dt:epoch_id(), - pb_mode = undefined :: 'undefined' | 'high' | 'low', - high_clnt :: 'undefined' | pid(), - dbg_props = [] :: list(), % proplist - props = [] :: list() % proplist - }). - --record(http_goop, { - len, % content-length - x_csum % x-checksum - }). - -start_link([{FluName, TcpPort, DataDir}|Rest]) - when is_atom(FluName), is_integer(TcpPort), is_list(DataDir) -> - {ok, spawn_link(fun() -> main2(FluName, TcpPort, DataDir, Rest) end)}. - -stop(Pid) -> - case erlang:is_process_alive(Pid) of - true -> - Pid ! killme, - ok; - false -> - error - end. - -update_wedge_state(PidSpec, Boolean, EpochId) - when (Boolean == true orelse Boolean == false), is_tuple(EpochId) -> - PidSpec ! {wedge_state_change, Boolean, EpochId}. - -wedge_myself(PidSpec, EpochId) - when is_tuple(EpochId) -> - PidSpec ! {wedge_myself, EpochId}. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -ets_table_name(FluName) when is_atom(FluName) -> - list_to_atom(atom_to_list(FluName) ++ "_epoch"). -%% ets_table_name(FluName) when is_binary(FluName) -> -%% list_to_atom(binary_to_list(FluName) ++ "_epoch"). - -main2(FluName, TcpPort, DataDir, Rest) -> - {Props, DbgProps} = case proplists:get_value(dbg, Rest) of - undefined -> - {Rest, []}; - DPs -> - {lists:keydelete(dbg, 1, Rest), DPs} - end, - {SendAppendPidToProj_p, ProjectionPid} = - case proplists:get_value(projection_store_registered_name, Rest) of - undefined -> - RN = make_projection_server_regname(FluName), - {ok, PP} = - machi_projection_store:start_link(RN, DataDir, undefined), - {true, PP}; - RN -> - {false, whereis(RN)} - end, - InitialWedged_p = proplists:get_value(initial_wedged, DbgProps), - ProjRes = machi_projection_store:read_latest_projection(ProjectionPid, - private), - {Wedged_p, EpochId} = - if InitialWedged_p == undefined, - is_tuple(ProjRes), element(1, ProjRes) == ok -> - {ok, Proj} = ProjRes, - {false, {Proj#projection_v1.epoch_number, - Proj#projection_v1.epoch_csum}}; - InitialWedged_p == false -> - {false, ?DUMMY_PV1_EPOCH}; - true -> - {true, undefined} - end, - S0 = #state{flu_name=FluName, - proj_store=ProjectionPid, - tcp_port=TcpPort, - data_dir=DataDir, - wedged=Wedged_p, - etstab=ets_table_name(FluName), - epoch_id=EpochId, - dbg_props=DbgProps, - props=Props}, - AppendPid = start_append_server(S0, self()), - receive - append_server_ack -> ok - end, - if SendAppendPidToProj_p -> - machi_projection_store:set_wedge_notify_pid(ProjectionPid, - AppendPid); - true -> - ok - end, - S1 = S0#state{append_pid=AppendPid}, - ListenPid = start_listen_server(S1), - - Config_e = machi_util:make_config_filename(DataDir, "unused"), - ok = filelib:ensure_dir(Config_e), - {_, Data_e} = machi_util:make_data_filename(DataDir, "unused"), - ok = filelib:ensure_dir(Data_e), - Projection_e = machi_util:make_projection_filename(DataDir, "unused"), - ok = filelib:ensure_dir(Projection_e), - - put(flu_flu_name, FluName), - put(flu_append_pid, AppendPid), - put(flu_projection_pid, ProjectionPid), - put(flu_listen_pid, ListenPid), - receive killme -> ok end, - (catch exit(AppendPid, kill)), - (catch exit(ProjectionPid, kill)), - (catch exit(ListenPid, kill)), - ok. - -start_listen_server(S) -> - proc_lib:spawn_link(fun() -> run_listen_server(S) end). - -start_append_server(S, AckPid) -> - FluPid = self(), - proc_lib:spawn_link(fun() -> run_append_server(FluPid, AckPid, S) end). - -run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> - register(make_listener_regname(FluName), self()), - SockOpts = ?PB_PACKET_OPTS ++ - [{reuseaddr, true}, {mode, binary}, {active, false}], - case gen_tcp:listen(TcpPort, SockOpts) of - {ok, LSock} -> - listen_server_loop(LSock, S); - Else -> - error_logger:warning_msg("~s:run_listen_server: " - "listen to TCP port ~w: ~w\n", - [?MODULE, TcpPort, Else]), - exit({?MODULE, run_listen_server, tcp_port, TcpPort, Else}) - end. - -run_append_server(FluPid, AckPid, #state{flu_name=Name, - wedged=Wedged_p,epoch_id=EpochId}=S) -> - %% Reminder: Name is the "main" name of the FLU, i.e., no suffix - register(Name, self()), - TID = ets:new(ets_table_name(Name), - [set, protected, named_table, {read_concurrency, true}]), - %% InitialWedged = proplists:get_value(initial_wedged, DbgProps, true), - %% ets:insert(TID, {epoch, {InitialWedged, {-65, <<"bogus epoch, yo">>}}}), - ets:insert(TID, {epoch, {Wedged_p, EpochId}}), - AckPid ! append_server_ack, - append_server_loop(FluPid, S#state{etstab=TID}). - -listen_server_loop(LSock, S) -> - {ok, Sock} = gen_tcp:accept(LSock), - spawn_link(fun() -> net_server_loop(Sock, S) end), - listen_server_loop(LSock, S). - -append_server_loop(FluPid, #state{data_dir=DataDir, wedged=Wedged_p, - epoch_id=OldEpochId, flu_name=FluName}=S) -> - AppendServerPid = self(), - receive - {seq_append, From, _Prefix, _Chunk, _CSum, _Extra, _EpochID} - when Wedged_p -> - From ! wedged, - append_server_loop(FluPid, S); - {seq_append, From, Prefix, Chunk, CSum, Extra, EpochID} -> - spawn(fun() -> append_server_dispatch(From, Prefix, - Chunk, CSum, Extra, EpochID, - DataDir, AppendServerPid) end), - append_server_loop(FluPid, S); - {wedge_myself, WedgeEpochId} -> - if not Wedged_p andalso WedgeEpochId == OldEpochId -> - true = ets:insert(S#state.etstab, - {epoch, {true, OldEpochId}}), - %% Tell my chain manager that it might want to react to - %% this new world. - Chmgr = machi_chain_manager1:make_chmgr_regname(FluName), - spawn(fun() -> - catch machi_chain_manager1:trigger_react_to_env(Chmgr) - end), - append_server_loop(FluPid, S#state{wedged=true}); - true -> - append_server_loop(FluPid, S) - end; - {wedge_state_change, Boolean, {NewEpoch, _}=NewEpochId} -> - OldEpoch = case OldEpochId of {OldE, _} -> OldE; - undefined -> -1 - end, - if NewEpoch >= OldEpoch -> - true = ets:insert(S#state.etstab, - {epoch, {Boolean, NewEpochId}}), - append_server_loop(FluPid, S#state{wedged=Boolean, - epoch_id=NewEpochId}); - true -> - append_server_loop(FluPid, S) - end; - {wedge_status, FromPid} -> - #state{wedged=Wedged_p, epoch_id=EpochId} = S, - FromPid ! {wedge_status_reply, Wedged_p, EpochId}, - append_server_loop(FluPid, S); - Else -> - io:format(user, "append_server_loop: WHA? ~p\n", [Else]), - append_server_loop(FluPid, S) - end. - -net_server_loop(Sock, S) -> - case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of - {ok, Bin} -> - {RespBin, S2} = - case machi_pb:decode_mpb_ll_request(Bin) of - LL_req when LL_req#mpb_ll_request.do_not_alter == 2 -> - {R, NewS} = do_pb_ll_request(LL_req, S), - {maybe_encode_response(R), mode(low, NewS)}; - _ -> - HL_req = machi_pb:decode_mpb_request(Bin), - 1 = HL_req#mpb_request.do_not_alter, - {R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)), - {machi_pb:encode_mpb_response(R), mode(high, NewS)} - end, - if RespBin == async_no_response -> - ok; - true -> - ok = gen_tcp:send(Sock, RespBin) - end, - net_server_loop(Sock, S2); - {error, SockError} -> - Msg = io_lib:format("Socket error ~w", [SockError]), - R = #mpb_ll_response{req_id= <<>>, - generic=#mpb_errorresp{code=1, msg=Msg}}, - Resp = machi_pb:encode_mpb_ll_response(R), - %% TODO: Weird that sometimes neither catch nor try/catch - %% can prevent OTP's SASL from logging an error here. - %% Error in process <0.545.0> with exit value: {badarg,[{erlang,port_command,....... - %% TODO: is this what causes the intermittent PULSE deadlock errors? - %% _ = (catch gen_tcp:send(Sock, Resp)), timer:sleep(1000), - (catch gen_tcp:close(Sock)), - exit(normal) - end. - -maybe_encode_response(async_no_response=X) -> - X; -maybe_encode_response(R) -> - machi_pb:encode_mpb_ll_response(R). - -mode(Mode, #state{pb_mode=undefined}=S) -> - S#state{pb_mode=Mode}; -mode(_, S) -> - S. - -make_high_clnt(#state{high_clnt=undefined}=S) -> - {ok, Proj} = machi_projection_store:read_latest_projection( - S#state.proj_store, private), - Ps = [P_srvr || {_, P_srvr} <- orddict:to_list( - Proj#projection_v1.members_dict)], - {ok, Clnt} = machi_cr_client:start_link(Ps), - S#state{high_clnt=Clnt}; -make_high_clnt(S) -> - S. - -do_pb_ll_request(#mpb_ll_request{req_id=ReqID}, #state{pb_mode=high}=S) -> - Result = {high_error, 41, "Low protocol request while in high mode"}, - {machi_pb_translate:to_pb_response(ReqID, unused, Result), S}; -do_pb_ll_request(PB_request, S) -> - Req = machi_pb_translate:from_pb_request(PB_request), - {ReqID, Cmd, Result, S2} = - case Req of - {RqID, {LowCmd, _}=CMD} - when LowCmd == low_proj; - LowCmd == low_wedge_status; LowCmd == low_list_files -> - %% Skip wedge check for projection commands! - %% Skip wedge check for these unprivileged commands - {Rs, NewS} = do_pb_ll_request3(CMD, S), - {RqID, CMD, Rs, NewS}; - {RqID, CMD} -> - EpochID = element(2, CMD), % by common convention - {Rs, NewS} = do_pb_ll_request2(EpochID, CMD, S), - {RqID, CMD, Rs, NewS} - end, - {machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}. - -do_pb_ll_request2(EpochID, CMD, S) -> - {Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2), - if Wedged_p == true -> - {{error, wedged}, S#state{epoch_id=CurrentEpochID}}; - is_tuple(EpochID) - andalso - EpochID /= CurrentEpochID -> - {Epoch, _} = EpochID, - {CurrentEpoch, _} = CurrentEpochID, - if Epoch < CurrentEpoch -> - ok; - true -> - %% We're at same epoch # but different checksum, or - %% we're at a newer/bigger epoch #. - wedge_myself(S#state.flu_name, CurrentEpochID), - ok - end, - {{error, bad_epoch}, S#state{epoch_id=CurrentEpochID}}; - true -> - do_pb_ll_request3(CMD, S#state{epoch_id=CurrentEpochID}) - end. - -do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) -> - {Msg, S}; -do_pb_ll_request3({low_auth, _BogusEpochID, _User, _Pass}, S) -> - {-6, S}; -do_pb_ll_request3({low_append_chunk, _EpochID, PKey, Prefix, Chunk, CSum_tag, - CSum, ChunkExtra}, S) -> - {do_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum, - ChunkExtra, S), S}; -do_pb_ll_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag, - CSum}, S) -> - {do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, S), S}; -do_pb_ll_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts}, S) -> - {do_server_read_chunk(File, Offset, Size, Opts, S), S}; -do_pb_ll_request3({low_checksum_list, _EpochID, File}, S) -> - {do_server_checksum_listing(File, S), S}; -do_pb_ll_request3({low_list_files, _EpochID}, S) -> - {do_server_list_files(S), S}; -do_pb_ll_request3({low_wedge_status, _EpochID}, S) -> - {do_server_wedge_status(S), S}; -do_pb_ll_request3({low_delete_migration, _EpochID, File}, S) -> - {do_server_delete_migration(File, S), S}; -do_pb_ll_request3({low_trunc_hack, _EpochID, File}, S) -> - {do_server_trunc_hack(File, S), S}; -do_pb_ll_request3({low_proj, PCMD}, S) -> - {do_server_proj_request(PCMD, S), S}. - -do_pb_hl_request(#mpb_request{req_id=ReqID}, #state{pb_mode=low}=S) -> - Result = {low_error, 41, "High protocol request while in low mode"}, - {machi_pb_translate:to_pb_response(ReqID, unused, Result), S}; -do_pb_hl_request(PB_request, S) -> - {ReqID, Cmd} = machi_pb_translate:from_pb_request(PB_request), - {Result, S2} = do_pb_hl_request2(Cmd, S), - {machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}. - -do_pb_hl_request2({high_echo, Msg}, S) -> - {Msg, S}; -do_pb_hl_request2({high_auth, _User, _Pass}, S) -> - {-77, S}; -do_pb_hl_request2({high_append_chunk, _todoPK, Prefix, ChunkBin, TaggedCSum, - ChunkExtra}, #state{high_clnt=Clnt}=S) -> - Chunk = {TaggedCSum, ChunkBin}, - Res = machi_cr_client:append_chunk_extra(Clnt, Prefix, Chunk, - ChunkExtra), - {Res, S}; -do_pb_hl_request2({high_write_chunk, File, Offset, ChunkBin, TaggedCSum}, - #state{high_clnt=Clnt}=S) -> - Chunk = {TaggedCSum, ChunkBin}, - Res = machi_cr_client:write_chunk(Clnt, File, Offset, Chunk), - {Res, S}; -do_pb_hl_request2({high_read_chunk, File, Offset, Size}, - #state{high_clnt=Clnt}=S) -> - Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size), - {Res, S}; -do_pb_hl_request2({high_checksum_list, File}, #state{high_clnt=Clnt}=S) -> - Res = machi_cr_client:checksum_list(Clnt, File), - {Res, S}; -do_pb_hl_request2({high_list_files}, #state{high_clnt=Clnt}=S) -> - Res = machi_cr_client:list_files(Clnt), - {Res, S}. - -do_server_proj_request({get_latest_epochid, ProjType}, - #state{proj_store=ProjStore}) -> - machi_projection_store:get_latest_epochid(ProjStore, ProjType); -do_server_proj_request({read_latest_projection, ProjType}, - #state{proj_store=ProjStore}) -> - machi_projection_store:read_latest_projection(ProjStore, ProjType); -do_server_proj_request({read_projection, ProjType, Epoch}, - #state{proj_store=ProjStore}) -> - machi_projection_store:read(ProjStore, ProjType, Epoch); -do_server_proj_request({write_projection, ProjType, Proj}, - #state{proj_store=ProjStore}) -> - machi_projection_store:write(ProjStore, ProjType, Proj); -do_server_proj_request({get_all_projections, ProjType}, - #state{proj_store=ProjStore}) -> - machi_projection_store:get_all_projections(ProjStore, ProjType); -do_server_proj_request({list_all_projections, ProjType}, - #state{proj_store=ProjStore}) -> - machi_projection_store:list_all_projections(ProjStore, ProjType); -do_server_proj_request({kick_projection_reaction}, - #state{flu_name=FluName}) -> - %% Tell my chain manager that it might want to react to - %% this new world. - Chmgr = machi_chain_manager1:make_chmgr_regname(FluName), - spawn(fun() -> - catch machi_chain_manager1:trigger_react_to_env(Chmgr) - end), - async_no_response. - -do_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum, - ChunkExtra, S) -> - case sanitize_file_string(Prefix) of - ok -> - do_server_append_chunk2(PKey, Prefix, Chunk, CSum_tag, CSum, - ChunkExtra, S); - _ -> - {error, bad_arg} - end. - -do_server_append_chunk2(_PKey, Prefix, Chunk, CSum_tag, Client_CSum, - ChunkExtra, #state{flu_name=FluName, - epoch_id=EpochID}=_S) -> - %% TODO: Do anything with PKey? - try - TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk), - FluName ! {seq_append, self(), Prefix, Chunk, TaggedCSum, - ChunkExtra, EpochID}, - receive - {assignment, Offset, File} -> - Size = iolist_size(Chunk), - {ok, {Offset, Size, File}}; - wedged -> - {error, wedged} - after 10*1000 -> - {error, partition} - end - catch - throw:{bad_csum, _CS} -> - {error, bad_checksum}; - error:badarg -> - error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]), - {error, bad_arg} - end. - -do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, - #state{data_dir=DataDir}=S) -> - case sanitize_file_string(File) of - ok -> - CSumPath = machi_util:make_checksum_filename(DataDir, File), - case file:open(CSumPath, [append, raw, binary]) of - {ok, FHc} -> - Path = DataDir ++ "/data/" ++ - machi_util:make_string(File), - {ok, FHd} = file:open(Path, [read, write, raw, binary]), - try - do_server_write_chunk2( - File, Offset, Chunk, CSum_tag, CSum, DataDir, - FHc, FHd) - after - (catch file:close(FHc)), - (catch file:close(FHd)) - end; - {error, enoent} -> - ok = filelib:ensure_dir(CSumPath), - do_server_write_chunk(File, Offset, Chunk, CSum_tag, - CSum, S) - end; - _ -> - {error, bad_arg} - end. - -do_server_write_chunk2(_File, Offset, Chunk, CSum_tag, - Client_CSum, _DataDir, FHc, FHd) -> - try - TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk), - Size = iolist_size(Chunk), - case file:pwrite(FHd, Offset, Chunk) of - ok -> - CSum_info = encode_csum_file_entry(Offset, Size, TaggedCSum), - ok = file:write(FHc, CSum_info), - ok; - _Else3 -> - machi_util:verb("Else3 ~p ~p ~p\n", - [Offset, Size, _Else3]), - {error, bad_arg} - end - catch - throw:{bad_csum, _CS} -> - {error, bad_checksum}; - error:badarg -> - error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]), - {error, bad_arg} - end. - -do_server_read_chunk(File, Offset, Size, _Opts, #state{data_dir=DataDir})-> - %% TODO: Look inside Opts someday. - case sanitize_file_string(File) of - ok -> - {_, Path} = machi_util:make_data_filename(DataDir, File), - case file:open(Path, [read, binary, raw]) of - {ok, FH} -> - try - case file:pread(FH, Offset, Size) of - {ok, Bytes} when byte_size(Bytes) == Size -> - {ok, Bytes}; - {ok, Bytes} -> - machi_util:verb("ok read but wanted ~p got ~p: ~p @ offset ~p\n", - [Size,size(Bytes),File,Offset]), - io:format(user, "ok read but wanted ~p got ~p: ~p @ offset ~p\n", - [Size,size(Bytes),File,Offset]), - {error, partial_read}; - eof -> - {error, not_written}; %% TODO perhaps_do_net_server_ec_read(Sock, FH); - _Else2 -> - machi_util:verb("Else2 ~p ~p ~P\n", - [Offset, Size, _Else2, 20]), - {error, bad_read} - end - after - file:close(FH) - end; - {error, enoent} -> - {error, not_written}; - {error, _Else} -> - io:format(user, "Unexpected ~p at ~p ~p\n", - [_Else, ?MODULE, ?LINE]), - {error, bad_arg} - end; - _ -> - {error, bad_arg} - end. - -do_server_checksum_listing(File, #state{data_dir=DataDir}=_S) -> - case sanitize_file_string(File) of - ok -> - ok = sync_checksum_file(File), - CSumPath = machi_util:make_checksum_filename(DataDir, File), - %% TODO: If this file is legitimately bigger than our - %% {packet_size,N} limit, then we'll have a difficult time, eh? - case file:read_file(CSumPath) of - {ok, Bin} -> - if byte_size(Bin) > (?PB_MAX_MSG_SIZE - 1024) -> - %% TODO: Fix this limitation by streaming the - %% binary in multiple smaller PB messages. - %% Also, don't read the file all at once. ^_^ - error_logger:error_msg("~s:~w oversize ~s\n", - [?MODULE, ?LINE, CSumPath]), - {error, bad_arg}; - true -> - {ok, Bin} - end; - {error, enoent} -> - {error, no_such_file}; - {error, _} -> - {error, bad_arg} - end; - _ -> - {error, bad_arg} - end. - -do_server_list_files(#state{data_dir=DataDir}=_S) -> - {_, WildPath} = machi_util:make_data_filename(DataDir, ""), - Files = filelib:wildcard("*", WildPath), - {ok, [begin - {ok, FI} = file:read_file_info(WildPath ++ "/" ++ File), - Size = FI#file_info.size, - {Size, File} - end || File <- Files]}. - -do_server_wedge_status(S) -> - {Wedged_p, CurrentEpochID0} = ets:lookup_element(S#state.etstab, epoch, 2), - CurrentEpochID = if CurrentEpochID0 == undefined -> - ?DUMMY_PV1_EPOCH; - true -> - CurrentEpochID0 - end, - {Wedged_p, CurrentEpochID}. - -do_server_delete_migration(File, #state{data_dir=DataDir}=_S) -> - case sanitize_file_string(File) of - ok -> - {_, Path} = machi_util:make_data_filename(DataDir, File), - case file:delete(Path) of - ok -> - ok; - {error, enoent} -> - {error, no_such_file}; - _ -> - {error, bad_arg} - end; - _ -> - {error, bad_arg} - end. - -do_server_trunc_hack(File, #state{data_dir=DataDir}=_S) -> - case sanitize_file_string(File) of - ok -> - {_, Path} = machi_util:make_data_filename(DataDir, File), - case file:open(Path, [read, write, binary, raw]) of - {ok, FH} -> - try - {ok, ?MINIMUM_OFFSET} = file:position(FH, - ?MINIMUM_OFFSET), - ok = file:truncate(FH), - ok - after - file:close(FH) - end; - {error, enoent} -> - {error, no_such_file}; - _ -> - {error, bad_arg} - end; - _ -> - {error, bad_arg} - end. - -append_server_dispatch(From, Prefix, Chunk, CSum, Extra, EpochID, - DataDir, LinkPid) -> - Pid = write_server_get_pid(Prefix, EpochID, DataDir, LinkPid), - Pid ! {seq_append, From, Prefix, Chunk, CSum, Extra, EpochID}, - exit(normal). - -sanitize_file_string(Str) -> - case re:run(Str, "/") of - nomatch -> - ok; - _ -> - error - end. - -sync_checksum_file(File) -> - Prefix = re:replace(File, "\\..*", "", [{return, binary}]), - case write_server_find_pid(Prefix) of - undefined -> - ok; - Pid -> - Ref = make_ref(), - Pid ! {sync_stuff, self(), Ref}, - receive - {sync_finished, Ref} -> - ok - after 5000 -> - case write_server_find_pid(Prefix) of - undefined -> - ok; - Pid2 when Pid2 /= Pid -> - ok; - _Pid2 -> - error - end - end - end. - -write_server_get_pid(Prefix, EpochID, DataDir, LinkPid) -> - case write_server_find_pid(Prefix) of - undefined -> - start_seq_append_server(Prefix, EpochID, DataDir, LinkPid), - timer:sleep(1), - write_server_get_pid(Prefix, EpochID, DataDir, LinkPid); - Pid -> - Pid - end. - -write_server_find_pid(Prefix) -> - FluName = machi_util:make_regname(Prefix), - whereis(FluName). - -start_seq_append_server(Prefix, EpochID, DataDir, AppendServerPid) -> - proc_lib:spawn_link(fun() -> - %% The following is only necessary to - %% make nice process relationships in - %% 'appmon' and related tools. - put('$ancestors', [AppendServerPid]), - put('$initial_call', {x,y,3}), - link(AppendServerPid), - run_seq_append_server(Prefix, EpochID, DataDir) - end). - -run_seq_append_server(Prefix, EpochID, DataDir) -> - true = register(machi_util:make_regname(Prefix), self()), - run_seq_append_server2(Prefix, EpochID, DataDir). - -run_seq_append_server2(Prefix, EpochID, DataDir) -> - FileNum = machi_util:read_max_filenum(DataDir, Prefix) + 1, - case machi_util:increment_max_filenum(DataDir, Prefix) of - ok -> - machi_util:info_msg("start: ~p server at file ~w\n", - [Prefix, FileNum]), - seq_append_server_loop(DataDir, Prefix, EpochID, FileNum); - Else -> - error_logger:error_msg("start: ~p server at file ~w: ~p\n", - [Prefix, FileNum, Else]), - exit(Else) - - end. - --spec seq_name_hack() -> string(). -seq_name_hack() -> - lists:flatten(io_lib:format("~.36B~.36B", - [element(3,now()), - list_to_integer(os:getpid())])). - -seq_append_server_loop(DataDir, Prefix, EpochID, FileNum) -> - SequencerNameHack = seq_name_hack(), - {File, FullPath} = machi_util:make_data_filename( - DataDir, Prefix, SequencerNameHack, FileNum), - {ok, FHd} = file:open(FullPath, - [read, write, raw, binary]), - CSumPath = machi_util:make_checksum_filename( - DataDir, Prefix, SequencerNameHack, FileNum), - {ok, FHc} = file:open(CSumPath, [append, raw, binary]), - seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}, EpochID, FileNum, - ?MINIMUM_OFFSET). - -seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, EpochID, - FileNum, Offset) - when Offset > ?MAX_FILE_SIZE -> - ok = file:close(FHd), - ok = file:close(FHc), - machi_util:info_msg("rollover: ~p server at file ~w offset ~w\n", - [Prefix, FileNum, Offset]), - run_seq_append_server2(Prefix, EpochID, DataDir); -seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, EpochID, - FileNum, Offset) -> - receive - {seq_append, From, Prefix, Chunk, TaggedCSum, Extra, R_EpochID} - when R_EpochID == EpochID -> - if Chunk /= <<>> -> - ok = file:pwrite(FHd, Offset, Chunk); - true -> - ok - end, - From ! {assignment, Offset, File}, - Size = iolist_size(Chunk), - CSum_info = encode_csum_file_entry(Offset, Size, TaggedCSum), - ok = file:write(FHc, CSum_info), - seq_append_server_loop(DataDir, Prefix, File, FH_, EpochID, - FileNum, Offset + Size + Extra); - {seq_append, _From, _Prefix, _Chunk, _TCSum, _Extra, R_EpochID}=MSG -> - %% Rare'ish event: send MSG to myself so it doesn't get lost - %% while we recurse around to pick up a new FileNum. - self() ! MSG, - machi_util:info_msg("rollover: ~p server at file ~w offset ~w " - "by new epoch_id ~W\n", - [Prefix, FileNum, Offset, R_EpochID, 8]), - run_seq_append_server2(Prefix, R_EpochID, DataDir); - {sync_stuff, FromPid, Ref} -> - file:sync(FHc), - FromPid ! {sync_finished, Ref}, - seq_append_server_loop(DataDir, Prefix, File, FH_, EpochID, - FileNum, Offset) - after 30*1000 -> - ok = file:close(FHd), - ok = file:close(FHc), - machi_util:info_msg("stop: ~p server ~p at file ~w offset ~w\n", - [Prefix, self(), FileNum, Offset]), - exit(normal) - end. - -make_listener_regname(BaseName) -> - list_to_atom(atom_to_list(BaseName) ++ "_listener"). - -%% This is the name of the projection store that is spawned by the -%% *flu*, for use primarily in testing scenarios. In normal use, we -%% ought to be using the OTP style of managing processes, via -%% supervisors, namely via machi_flu_psup.erl, which uses a -%% *different* naming convention for the projection store name that it -%% registers. - -make_projection_server_regname(BaseName) -> - list_to_atom(atom_to_list(BaseName) ++ "_pstore2"). - -http_hack_server(FluName, Line1, Sock, S) -> - {ok, {http_request, HttpOp, URI0, _HttpV}, _x} = - erlang:decode_packet(http_bin, Line1, [{line_length,4095}]), - MyURI = case URI0 of - {abs_path, Path} -> <<"/", Rest/binary>> = Path, - Rest; - _ -> URI0 - end, - Hdrs = http_hack_harvest_headers(Sock), - G = http_hack_digest_header_goop(Hdrs, #http_goop{}), - case HttpOp of - 'PUT' -> - http_hack_server_put(Sock, G, FluName, MyURI); - 'GET' -> - http_hack_server_get(Sock, G, FluName, MyURI, S) - end, - ok = gen_tcp:close(Sock), - exit(normal). - -http_hack_server_put(Sock, G, FluName, MyURI) -> - ok = inet:setopts(Sock, [{packet, raw}]), - {ok, Chunk} = gen_tcp:recv(Sock, G#http_goop.len, 60*1000), - CSum0 = machi_util:checksum_chunk(Chunk), - try - CSum = case G#http_goop.x_csum of - undefined -> - machi_util:make_tagged_csum(server_sha, CSum0); - XX when is_binary(XX) -> - if XX == CSum0 -> - machi_util:make_tagged_csum(client_sha, CSum0); - true -> - throw({bad_csum, XX}) - end - end, - FluName ! {seq_append, self(), MyURI, Chunk, CSum, 0, todo_epoch_id_bitrot} - catch - throw:{bad_csum, _CS} -> - Out = "HTTP/1.0 412 Precondition failed\r\n" - "X-Reason: bad checksum\r\n\r\n", - ok = gen_tcp:send(Sock, Out), - ok = gen_tcp:close(Sock), - exit(normal); - error:badarg -> - error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]) - end, - receive - {assignment, Offset, File} -> - Msg = io_lib:format("HTTP/1.0 201 Created\r\nLocation: ~s\r\n" - "X-Offset: ~w\r\nX-Size: ~w\r\n\r\n", - [File, Offset, byte_size(Chunk)]), - ok = gen_tcp:send(Sock, Msg); - wedged -> - ok = gen_tcp:send(Sock, <<"HTTP/1.0 499 WEDGED\r\n\r\n">>) - after 10*1000 -> - ok = gen_tcp:send(Sock, <<"HTTP/1.0 499 TIMEOUT\r\n\r\n">>) - end. - -http_hack_server_get(Sock, _G, _FluName, _MyURI, _S) -> - ok = gen_tcp:send(Sock, <<"TODO BROKEN FEATURE see old commits\r\n">>). - -http_hack_harvest_headers(Sock) -> - ok = inet:setopts(Sock, [{packet, httph}]), - http_hack_harvest_headers(gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT), - Sock, []). - -http_hack_harvest_headers({ok, http_eoh}, _Sock, Acc) -> - Acc; -http_hack_harvest_headers({error, _}, _Sock, _Acc) -> - []; -http_hack_harvest_headers({ok, Hdr}, Sock, Acc) -> - http_hack_harvest_headers(gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT), - Sock, [Hdr|Acc]). - -http_hack_digest_header_goop([], G) -> - G; -http_hack_digest_header_goop([{http_header, _, 'Content-Length', _, Str}|T], G) -> - http_hack_digest_header_goop(T, G#http_goop{len=list_to_integer(Str)}); -http_hack_digest_header_goop([{http_header, _, "X-Checksum", _, Str}|T], G) -> - SHA = machi_util:hexstr_to_bin(Str), - CSum = machi_util:make_tagged_csum(client_sha, SHA), - http_hack_digest_header_goop(T, G#http_goop{x_csum=CSum}); -http_hack_digest_header_goop([_H|T], G) -> - http_hack_digest_header_goop(T, G). - -http_hack_split_uri_options(OpsBin) -> - L = binary:split(OpsBin, <<"&">>), - [case binary:split(X, <<"=">>) of - [<<"offset">>, Bin] -> - {offset, binary_to_integer(Bin)}; - [<<"size">>, Bin] -> - {size, binary_to_integer(Bin)} - end || X <- L]. - -%% @doc Encode `Offset + Size + TaggedCSum' into an `iolist()' type for -%% internal storage by the FLU. - --spec encode_csum_file_entry( - machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()) -> - iolist(). -encode_csum_file_entry(Offset, Size, TaggedCSum) -> - Len = 8 + 4 + byte_size(TaggedCSum), - [<>, - TaggedCSum]. - -%% @doc Encode `Offset + Size + TaggedCSum' into an `binary()' type for -%% internal storage by the FLU. - --spec encode_csum_file_entry_bin( - machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()) -> - binary(). -encode_csum_file_entry_bin(Offset, Size, TaggedCSum) -> - Len = 8 + 4 + byte_size(TaggedCSum), - <>. - -%% @doc Decode a single `binary()' blob into an -%% `{Offset,Size,TaggedCSum}' tuple. -%% -%% The internal encoding (which is currently exposed to the outside world -%% via this function and related ones) is: -%% -%%
      -%%
    • 1 byte: record length -%%
    • -%%
    • 8 bytes (unsigned big-endian): byte offset -%%
    • -%%
    • 4 bytes (unsigned big-endian): chunk size -%%
    • -%%
    • all remaining bytes: tagged checksum (1st byte = type tag) -%%
    • -%%
    -%% -%% See `machi.hrl' for the tagged checksum types, e.g., -%% `?CSUM_TAG_NONE'. - --spec decode_csum_file_entry(binary()) -> - error | - {machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()}. -decode_csum_file_entry(<<_:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big, TaggedCSum/binary>>) -> - {Offset, Size, TaggedCSum}; -decode_csum_file_entry(_Else) -> - error. - -%% @doc Split a `binary()' blob of `checksum_list' data into a list of -%% unparsed `binary()' blobs, one per entry. -%% -%% Decode the unparsed blobs with {@link decode_csum_file_entry/1}, if -%% desired. -%% -%% The return value `TrailingJunk' is unparseable bytes at the end of -%% the checksum list blob. - --spec split_checksum_list_blob(binary()) -> - {list(binary()), TrailingJunk::binary()}. -split_checksum_list_blob(Bin) -> - split_checksum_list_blob(Bin, []). - -split_checksum_list_blob(<>, Acc)-> - case get(hack_length) of - Len -> ok; - _ -> put(hack_different, true) - end, - split_checksum_list_blob(Rest, [<>|Acc]); -split_checksum_list_blob(Rest, Acc) -> - {lists:reverse(Acc), Rest}. - -%% @doc Split a `binary()' blob of `checksum_list' data into a list of -%% `{Offset,Size,TaggedCSum}' tuples. - --spec split_checksum_list_blob_decode(binary()) -> - {list({machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()}), - TrailingJunk::binary()}. -split_checksum_list_blob_decode(Bin) -> - split_checksum_list_blob_decode(Bin, []). - -split_checksum_list_blob_decode(<>, Acc)-> - One = <>, - case decode_csum_file_entry(One) of - error -> - split_checksum_list_blob_decode(Rest, Acc); - DecOne -> - split_checksum_list_blob_decode(Rest, [DecOne|Acc]) - end; -split_checksum_list_blob_decode(Rest, Acc) -> - {lists:reverse(Acc), Rest}. - -check_or_make_tagged_checksum(?CSUM_TAG_NONE, _Client_CSum, Chunk) -> - %% TODO: If the client was foolish enough to use - %% this type of non-checksum, then the client gets - %% what it deserves wrt data integrity, alas. In - %% the client-side Chain Replication method, each - %% server will calculated this independently, which - %% isn't exactly what ought to happen for best data - %% integrity checking. In server-side CR, the csum - %% should be calculated by the head and passed down - %% the chain together with the value. - CS = machi_util:checksum_chunk(Chunk), - machi_util:make_tagged_csum(server_sha, CS); -check_or_make_tagged_checksum(?CSUM_TAG_CLIENT_SHA, Client_CSum, Chunk) -> - CS = machi_util:checksum_chunk(Chunk), - if CS == Client_CSum -> - machi_util:make_tagged_csum(server_sha, - Client_CSum); - true -> - throw({bad_csum, CS}) - end. - --ifdef(TEST). - -%% Remove "_COMMENTED" string to run the demo/exploratory code. - -timing_demo_test_COMMENTED_() -> - {timeout, 300, fun() -> timing_demo_test2() end}. - -%% Demo/exploratory hackery to check relative speeds of dealing with -%% checksum data in different ways. -%% -%% Summary: -%% -%% * Use compact binary encoding, with 1 byte header for entry length. -%% * Because the hex-style code is *far* slower just for enc & dec ops. -%% * For 1M entries of enc+dec: 0.215 sec vs. 15.5 sec. -%% * File sorter when sorting binaries as-is is only 30-40% slower -%% than an in-memory split (of huge binary emulated by file:read_file() -%% "big slurp") and sort of the same as-is sortable binaries. -%% * File sorter slows by a factor of about 2.5 if {order, fun compare/2} -%% function must be used, i.e. because the checksum entry lengths differ. -%% * File sorter + {order, fun compare/2} is still *far* faster than external -%% sort by OS X's sort(1) of sortable ASCII hex-style: -%% 4.5 sec vs. 21 sec. -%% * File sorter {order, fun compare/2} is faster than in-memory sort -%% of order-friendly 3-tuple-style: 4.5 sec vs. 15 sec. - -timing_demo_test2() -> - Xs = [random:uniform(1 bsl 32) || _ <- lists:duplicate(1*1000*1000, $x)], - CSum = <<"123456789abcdef0A">>, - 17 = byte_size(CSum), - io:format(user, "\n", []), - - %% %% {ok, ZZZ} = file:open("/tmp/foo.hex-style", [write, binary, raw, delayed_write]), - io:format(user, "Hex-style file entry enc+dec: ", []), - [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], - {HexUSec, _} = - timer:tc(fun() -> - lists:foldl(fun(X, _) -> - B = encode_csum_file_entry_hex(X, 100, CSum), - %% file:write(ZZZ, [B, 10]), - decode_csum_file_entry_hex(list_to_binary(B)) - end, x, Xs) - end), - io:format(user, "~.3f sec\n", [HexUSec / 1000000]), - %% %% file:close(ZZZ), - - io:format(user, "Not-sortable file entry enc+dec: ", []), - [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], - {NotSortedUSec, _} = - timer:tc(fun() -> - lists:foldl(fun(X, _) -> - B = encode_csum_file_entry(X, 100, CSum), - decode_csum_file_entry(list_to_binary(B)) - end, x, Xs) - end), - io:format(user, "~.3f sec\n", [NotSortedUSec / 1000000]), - - NotHexList = lists:foldl(fun(X, Acc) -> - B = encode_csum_file_entry(X, 100, CSum), - [B|Acc] - end, [], Xs), - NotHexBin = iolist_to_binary(NotHexList), - - io:format(user, "Split NotHexBin: ", []), - [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], - {NotHexBinUSec, SplitRes} = - timer:tc(fun() -> - put(hack_length, 29), - put(hack_different, false), - {Sorted, _Leftover} = split_checksum_list_blob(NotHexBin), - io:format(user, " Leftover ~p (hack_different ~p) ", [_Leftover, get(hack_different)]), - Sorted - end), - io:format(user, "~.3f sec\n", [NotHexBinUSec / 1000000]), - - io:format(user, "Sort Split results: ", []), - [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], - {SortSplitUSec, _} = - timer:tc(fun() -> - lists:sort(SplitRes) - %% lists:sort(fun sort_2lines/2, SplitRes) - end), - io:format(user, "~.3f sec\n", [SortSplitUSec / 1000000]), - - UnsortedName = "/tmp/foo.unsorted", - SortedName = "/tmp/foo.sorted", - - ok = file:write_file(UnsortedName, NotHexList), - io:format(user, "File Sort Split results: ", []), - [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], - {FileSortUSec, _} = - timer:tc(fun() -> - {ok, FHin} = file:open(UnsortedName, [read, binary]), - {ok, FHout} = file:open(SortedName, - [write, binary, delayed_write]), - put(hack_sorter_sha_ctx, crypto:hash_init(sha)), - ok = file_sorter:sort(sort_input_fun(FHin, <<>>), - sort_output_fun(FHout), - [{format,binary}, - {header, 1} - %% , {order, fun sort_2lines/2} - ]) - end), - io:format(user, "~.3f sec\n", [FileSortUSec / 1000000]), - _SHA = crypto:hash_final(get(hack_sorter_sha_ctx)), - %% io:format(user, "SHA via (hack_sorter_sha_ctx) = ~p\n", [_SHA]), - - io:format(user, "NotHex-Not-sortable tuple list creation: ", []), - [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], - {NotHexTupleCreationUSec, NotHexTupleList} = - timer:tc(fun() -> - lists:foldl(fun(X, Acc) -> - B = encode_csum_file_entry_hex( - X, 100, CSum), - [B|Acc] - end, [], Xs) - end), - io:format(user, "~.3f sec\n", [NotHexTupleCreationUSec / 1000000]), - - io:format(user, "NotHex-Not-sortable tuple list sort: ", []), - [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], - {NotHexTupleSortUSec, _} = - timer:tc(fun() -> - lists:sort(NotHexTupleList) - end), - io:format(user, "~.3f sec\n", [NotHexTupleSortUSec / 1000000]), - - ok. - -sort_2lines(<<_:1/binary, A/binary>>, <<_:1/binary, B/binary>>) -> - A < B. - -sort_input_fun(FH, PrevStuff) -> - fun(close) -> - ok; - (read) -> - case file:read(FH, 1024*1024) of - {ok, NewStuff} -> - AllStuff = if PrevStuff == <<>> -> - NewStuff; - true -> - <> - end, - {SplitRes, Leftover} = split_checksum_list_blob(AllStuff), - {SplitRes, sort_input_fun(FH, Leftover)}; - eof -> - end_of_input - end - end. - -sort_output_fun(FH) -> - fun(close) -> - file:close(FH); - (Stuff) -> - Ctx = get(hack_sorter_sha_ctx), - put(hack_sorter_sha_ctx, crypto:hash_update(Ctx, Stuff)), - ok = file:write(FH, Stuff), - sort_output_fun(FH) - end. - -encode_csum_file_entry_hex(Offset, Size, TaggedCSum) -> - OffsetHex = machi_util:bin_to_hexstr(<>), - SizeHex = machi_util:bin_to_hexstr(<>), - CSumHex = machi_util:bin_to_hexstr(TaggedCSum), - [OffsetHex, 32, SizeHex, 32, CSumHex]. - -decode_csum_file_entry_hex(<>) -> - Offset = machi_util:hexstr_to_bin(OffsetHex), - Size = machi_util:hexstr_to_bin(SizeHex), - CSum = machi_util:hexstr_to_bin(CSumHex), - {Offset, Size, CSum}. - --endif. % TEST +% +%-include_lib("kernel/include/file.hrl"). +% +%-include("machi.hrl"). +%-include("machi_pb.hrl"). +%-include("machi_projection.hrl"). +%-define(V(X,Y), ok). +%%% -include("machi_verbose.hrl"). +% +%-ifdef(TEST). +%-include_lib("eunit/include/eunit.hrl"). +%-endif. % TEST +% +% +%-export([start_link/1, stop/1, +% update_wedge_state/3, wedge_myself/2]). +%-export([make_listener_regname/1, make_projection_server_regname/1]). +%-export([encode_csum_file_entry/3, encode_csum_file_entry_bin/3, +% decode_csum_file_entry/1, +% split_checksum_list_blob/1, split_checksum_list_blob_decode/1]). +% +%-record(state, { +% flu_name :: atom(), +% proj_store :: pid(), +% append_pid :: pid(), +% tcp_port :: non_neg_integer(), +% data_dir :: string(), +% wedged = true :: boolean(), +% etstab :: ets:tid(), +% epoch_id :: 'undefined' | machi_dt:epoch_id(), +% pb_mode = undefined :: 'undefined' | 'high' | 'low', +% high_clnt :: 'undefined' | pid(), +% dbg_props = [] :: list(), % proplist +% props = [] :: list() % proplist +% }). +% +%start_link([{FluName, TcpPort, DataDir}|Rest]) +% when is_atom(FluName), is_integer(TcpPort), is_list(DataDir) -> +% {ok, spawn_link(fun() -> main2(FluName, TcpPort, DataDir, Rest) end)}. +% +%stop(Pid) -> +% case erlang:is_process_alive(Pid) of +% true -> +% Pid ! killme, +% ok; +% false -> +% error +% end. +% +%update_wedge_state(PidSpec, Boolean, EpochId) +% when (Boolean == true orelse Boolean == false), is_tuple(EpochId) -> +% PidSpec ! {wedge_state_change, Boolean, EpochId}. +% +%wedge_myself(PidSpec, EpochId) +% when is_tuple(EpochId) -> +% PidSpec ! {wedge_myself, EpochId}. +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +%ets_table_name(FluName) when is_atom(FluName) -> +% list_to_atom(atom_to_list(FluName) ++ "_epoch"). +%%% ets_table_name(FluName) when is_binary(FluName) -> +%%% list_to_atom(binary_to_list(FluName) ++ "_epoch"). +% +%main2(FluName, TcpPort, DataDir, Rest) -> +% {Props, DbgProps} = case proplists:get_value(dbg, Rest) of +% undefined -> +% {Rest, []}; +% DPs -> +% {lists:keydelete(dbg, 1, Rest), DPs} +% end, +% {SendAppendPidToProj_p, ProjectionPid} = +% case proplists:get_value(projection_store_registered_name, Rest) of +% undefined -> +% RN = make_projection_server_regname(FluName), +% {ok, PP} = +% machi_projection_store:start_link(RN, DataDir, undefined), +% {true, PP}; +% RN -> +% {false, whereis(RN)} +% end, +% InitialWedged_p = proplists:get_value(initial_wedged, DbgProps), +% ProjRes = machi_projection_store:read_latest_projection(ProjectionPid, +% private), +% {Wedged_p, EpochId} = +% if InitialWedged_p == undefined, +% is_tuple(ProjRes), element(1, ProjRes) == ok -> +% {ok, Proj} = ProjRes, +% {false, {Proj#projection_v1.epoch_number, +% Proj#projection_v1.epoch_csum}}; +% InitialWedged_p == false -> +% {false, ?DUMMY_PV1_EPOCH}; +% true -> +% {true, undefined} +% end, +% S0 = #state{flu_name=FluName, +% proj_store=ProjectionPid, +% tcp_port=TcpPort, +% data_dir=DataDir, +% wedged=Wedged_p, +% etstab=ets_table_name(FluName), +% epoch_id=EpochId, +% dbg_props=DbgProps, +% props=Props}, +% AppendPid = start_append_server(S0, self()), +% receive +% append_server_ack -> ok +% end, +% if SendAppendPidToProj_p -> +% machi_projection_store:set_wedge_notify_pid(ProjectionPid, +% AppendPid); +% true -> +% ok +% end, +% S1 = S0#state{append_pid=AppendPid}, +% ListenPid = start_listen_server(S1), +% +% Config_e = machi_util:make_config_filename(DataDir, "unused"), +% ok = filelib:ensure_dir(Config_e), +% {_, Data_e} = machi_util:make_data_filename(DataDir, "unused"), +% ok = filelib:ensure_dir(Data_e), +% Projection_e = machi_util:make_projection_filename(DataDir, "unused"), +% ok = filelib:ensure_dir(Projection_e), +% +% put(flu_flu_name, FluName), +% put(flu_append_pid, AppendPid), +% put(flu_projection_pid, ProjectionPid), +% put(flu_listen_pid, ListenPid), +% receive killme -> ok end, +% (catch exit(AppendPid, kill)), +% (catch exit(ProjectionPid, kill)), +% (catch exit(ListenPid, kill)), +% ok. +% +% +% +% +% +%do_server_proj_request({get_latest_epochid, ProjType}, +% #state{proj_store=ProjStore}) -> +% machi_projection_store:get_latest_epochid(ProjStore, ProjType); +%do_server_proj_request({read_latest_projection, ProjType}, +% #state{proj_store=ProjStore}) -> +% machi_projection_store:read_latest_projection(ProjStore, ProjType); +%do_server_proj_request({read_projection, ProjType, Epoch}, +% #state{proj_store=ProjStore}) -> +% machi_projection_store:read(ProjStore, ProjType, Epoch); +%do_server_proj_request({write_projection, ProjType, Proj}, +% #state{proj_store=ProjStore}) -> +% machi_projection_store:write(ProjStore, ProjType, Proj); +%do_server_proj_request({get_all_projections, ProjType}, +% #state{proj_store=ProjStore}) -> +% machi_projection_store:get_all_projections(ProjStore, ProjType); +%do_server_proj_request({list_all_projections, ProjType}, +% #state{proj_store=ProjStore}) -> +% machi_projection_store:list_all_projections(ProjStore, ProjType); +%do_server_proj_request({kick_projection_reaction}, +% #state{flu_name=FluName}) -> +% %% Tell my chain manager that it might want to react to +% %% this new world. +% Chmgr = machi_chain_manager1:make_chmgr_regname(FluName), +% spawn(fun() -> +% catch machi_chain_manager1:trigger_react_to_env(Chmgr) +% end), +% async_no_response. +% +%start_seq_append_server(Prefix, EpochID, DataDir, AppendServerPid) -> +% proc_lib:spawn_link(fun() -> +% %% The following is only necessary to +% %% make nice process relationships in +% %% 'appmon' and related tools. +% put('$ancestors', [AppendServerPid]), +% put('$initial_call', {x,y,3}), +% link(AppendServerPid), +% run_seq_append_server(Prefix, EpochID, DataDir) +% end). +% +%run_seq_append_server(Prefix, EpochID, DataDir) -> +% true = register(machi_util:make_regname(Prefix), self()), +% run_seq_append_server2(Prefix, EpochID, DataDir). +% +%run_seq_append_server2(Prefix, EpochID, DataDir) -> +% FileNum = machi_util:read_max_filenum(DataDir, Prefix) + 1, +% case machi_util:increment_max_filenum(DataDir, Prefix) of +% ok -> +% machi_util:info_msg("start: ~p server at file ~w\n", +% [Prefix, FileNum]), +% seq_append_server_loop(DataDir, Prefix, EpochID, FileNum); +% Else -> +% error_logger:error_msg("start: ~p server at file ~w: ~p\n", +% [Prefix, FileNum, Else]), +% exit(Else) +% +% end. +% +%-spec seq_name_hack() -> string(). +%seq_name_hack() -> +% lists:flatten(io_lib:format("~.36B~.36B", +% [element(3,now()), +% list_to_integer(os:getpid())])). +% +%seq_append_server_loop(DataDir, Prefix, EpochID, FileNum) -> +% SequencerNameHack = seq_name_hack(), +% {File, FullPath} = machi_util:make_data_filename( +% DataDir, Prefix, SequencerNameHack, FileNum), +% {ok, FHd} = file:open(FullPath, +% [read, write, raw, binary]), +% CSumPath = machi_util:make_checksum_filename( +% DataDir, Prefix, SequencerNameHack, FileNum), +% {ok, FHc} = file:open(CSumPath, [append, raw, binary]), +% seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}, EpochID, FileNum, +% ?MINIMUM_OFFSET). +% +%seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, EpochID, +% FileNum, Offset) +% when Offset > ?MAX_FILE_SIZE -> +% ok = file:close(FHd), +% ok = file:close(FHc), +% machi_util:info_msg("rollover: ~p server at file ~w offset ~w\n", +% [Prefix, FileNum, Offset]), +% run_seq_append_server2(Prefix, EpochID, DataDir); +%seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, EpochID, +% FileNum, Offset) -> +% receive +% {seq_append, From, Prefix, Chunk, TaggedCSum, Extra, R_EpochID} +% when R_EpochID == EpochID -> +% if Chunk /= <<>> -> +% %% Do we want better error handling here than just a bad match crash? +% %% Does the error tuple need to propagate to somewhere? +% ok = try_write_position(FHd, Offset, Chunk); +% true -> +% ok +% end, +% From ! {assignment, Offset, File}, +% Size = iolist_size(Chunk), +% CSum_info = encode_csum_file_entry(Offset, Size, TaggedCSum), +% ok = file:write(FHc, CSum_info), +% seq_append_server_loop(DataDir, Prefix, File, FH_, EpochID, +% FileNum, Offset + Size + Extra); +% {seq_append, _From, _Prefix, _Chunk, _TCSum, _Extra, R_EpochID}=MSG -> +% %% Rare'ish event: send MSG to myself so it doesn't get lost +% %% while we recurse around to pick up a new FileNum. +% self() ! MSG, +% machi_util:info_msg("rollover: ~p server at file ~w offset ~w " +% "by new epoch_id ~W\n", +% [Prefix, FileNum, Offset, R_EpochID, 8]), +% run_seq_append_server2(Prefix, R_EpochID, DataDir); +% {sync_stuff, FromPid, Ref} -> +% file:sync(FHc), +% FromPid ! {sync_finished, Ref}, +% seq_append_server_loop(DataDir, Prefix, File, FH_, EpochID, +% FileNum, Offset) +% after 30*1000 -> +% ok = file:close(FHd), +% ok = file:close(FHc), +% machi_util:info_msg("stop: ~p server ~p at file ~w offset ~w\n", +% [Prefix, self(), FileNum, Offset]), +% exit(normal) +% end. +% +%try_write_position(FHd, Offset, Chunk) -> +% ok = case file:pread(FHd, Offset, 1) of %% one byte should be enough right? +% eof -> +% ok; +% {ok, _} -> +% {error, error_written}; +% {error, Reason} -> +% {error, Reason} +% end, +% ok = file:pwrite(FHd, Offset, Chunk), +% ok. +% +%make_listener_regname(BaseName) -> +% list_to_atom(atom_to_list(BaseName) ++ "_listener"). +% +%start_append_server(_,_) -> ok. +%start_listen_server(_,_) -> ok. +% +%%% This is the name of the projection store that is spawned by the +%%% *flu*, for use primarily in testing scenarios. In normal use, we +%%% ought to be using the OTP style of managing processes, via +%%% supervisors, namely via machi_flu_psup.erl, which uses a +%%% *different* naming convention for the projection store name that it +%%% registers. +% +%make_projection_server_regname(BaseName) -> +% list_to_atom(atom_to_list(BaseName) ++ "_pstore2"). +% +% +%%% @doc Encode `Offset + Size + TaggedCSum' into an `iolist()' type for +%%% internal storage by the FLU. +% +%-spec encode_csum_file_entry( +% machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()) -> +% iolist(). +%encode_csum_file_entry(Offset, Size, TaggedCSum) -> +% Len = 8 + 4 + byte_size(TaggedCSum), +% [<>, +% TaggedCSum]. +% +%%% @doc Encode `Offset + Size + TaggedCSum' into an `binary()' type for +%%% internal storage by the FLU. +% +%-spec encode_csum_file_entry_bin( +% machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()) -> +% binary(). +%encode_csum_file_entry_bin(Offset, Size, TaggedCSum) -> +% Len = 8 + 4 + byte_size(TaggedCSum), +% <>. +% +%%% @doc Decode a single `binary()' blob into an +%%% `{Offset,Size,TaggedCSum}' tuple. +%%% +%%% The internal encoding (which is currently exposed to the outside world +%%% via this function and related ones) is: +%%% +%%%
      +%%%
    • 1 byte: record length +%%%
    • +%%%
    • 8 bytes (unsigned big-endian): byte offset +%%%
    • +%%%
    • 4 bytes (unsigned big-endian): chunk size +%%%
    • +%%%
    • all remaining bytes: tagged checksum (1st byte = type tag) +%%%
    • +%%%
    +%%% +%%% See `machi.hrl' for the tagged checksum types, e.g., +%%% `?CSUM_TAG_NONE'. +% +%-spec decode_csum_file_entry(binary()) -> +% error | +% {machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()}. +%decode_csum_file_entry(<<_:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big, TaggedCSum/binary>>) -> +% {Offset, Size, TaggedCSum}; +%decode_csum_file_entry(_Else) -> +% error. +% +%%% @doc Split a `binary()' blob of `checksum_list' data into a list of +%%% unparsed `binary()' blobs, one per entry. +%%% +%%% Decode the unparsed blobs with {@link decode_csum_file_entry/1}, if +%%% desired. +%%% +%%% The return value `TrailingJunk' is unparseable bytes at the end of +%%% the checksum list blob. +% +%-spec split_checksum_list_blob(binary()) -> +% {list(binary()), TrailingJunk::binary()}. +%split_checksum_list_blob(Bin) -> +% split_checksum_list_blob(Bin, []). +% +%split_checksum_list_blob(<>, Acc)-> +% case get(hack_length) of +% Len -> ok; +% _ -> put(hack_different, true) +% end, +% split_checksum_list_blob(Rest, [<>|Acc]); +%split_checksum_list_blob(Rest, Acc) -> +% {lists:reverse(Acc), Rest}. +% +%%% @doc Split a `binary()' blob of `checksum_list' data into a list of +%%% `{Offset,Size,TaggedCSum}' tuples. +% +%-spec split_checksum_list_blob_decode(binary()) -> +% {list({machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()}), +% TrailingJunk::binary()}. +%split_checksum_list_blob_decode(Bin) -> +% split_checksum_list_blob_decode(Bin, []). +% +%split_checksum_list_blob_decode(<>, Acc)-> +% One = <>, +% case decode_csum_file_entry(One) of +% error -> +% split_checksum_list_blob_decode(Rest, Acc); +% DecOne -> +% split_checksum_list_blob_decode(Rest, [DecOne|Acc]) +% end; +%split_checksum_list_blob_decode(Rest, Acc) -> +% {lists:reverse(Acc), Rest}. +% +%check_or_make_tagged_checksum(?CSUM_TAG_NONE, _Client_CSum, Chunk) -> +% %% TODO: If the client was foolish enough to use +% %% this type of non-checksum, then the client gets +% %% what it deserves wrt data integrity, alas. In +% %% the client-side Chain Replication method, each +% %% server will calculated this independently, which +% %% isn't exactly what ought to happen for best data +% %% integrity checking. In server-side CR, the csum +% %% should be calculated by the head and passed down +% %% the chain together with the value. +% CS = machi_util:checksum_chunk(Chunk), +% machi_util:make_tagged_csum(server_sha, CS); +%check_or_make_tagged_checksum(?CSUM_TAG_CLIENT_SHA, Client_CSum, Chunk) -> +% CS = machi_util:checksum_chunk(Chunk), +% if CS == Client_CSum -> +% machi_util:make_tagged_csum(server_sha, +% Client_CSum); +% true -> +% throw({bad_csum, CS}) +% end. +% +%-ifdef(TEST). +% +%%% Remove "_COMMENTED" string to run the demo/exploratory code. +% +%timing_demo_test_COMMENTED_() -> +% {timeout, 300, fun() -> timing_demo_test2() end}. +% +%%% Demo/exploratory hackery to check relative speeds of dealing with +%%% checksum data in different ways. +%%% +%%% Summary: +%%% +%%% * Use compact binary encoding, with 1 byte header for entry length. +%%% * Because the hex-style code is *far* slower just for enc & dec ops. +%%% * For 1M entries of enc+dec: 0.215 sec vs. 15.5 sec. +%%% * File sorter when sorting binaries as-is is only 30-40% slower +%%% than an in-memory split (of huge binary emulated by file:read_file() +%%% "big slurp") and sort of the same as-is sortable binaries. +%%% * File sorter slows by a factor of about 2.5 if {order, fun compare/2} +%%% function must be used, i.e. because the checksum entry lengths differ. +%%% * File sorter + {order, fun compare/2} is still *far* faster than external +%%% sort by OS X's sort(1) of sortable ASCII hex-style: +%%% 4.5 sec vs. 21 sec. +%%% * File sorter {order, fun compare/2} is faster than in-memory sort +%%% of order-friendly 3-tuple-style: 4.5 sec vs. 15 sec. +% +%timing_demo_test2() -> +% Xs = [random:uniform(1 bsl 32) || _ <- lists:duplicate(1*1000*1000, $x)], +% CSum = <<"123456789abcdef0A">>, +% 17 = byte_size(CSum), +% io:format(user, "\n", []), +% +% %% %% {ok, ZZZ} = file:open("/tmp/foo.hex-style", [write, binary, raw, delayed_write]), +% io:format(user, "Hex-style file entry enc+dec: ", []), +% [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], +% {HexUSec, _} = +% timer:tc(fun() -> +% lists:foldl(fun(X, _) -> +% B = encode_csum_file_entry_hex(X, 100, CSum), +% %% file:write(ZZZ, [B, 10]), +% decode_csum_file_entry_hex(list_to_binary(B)) +% end, x, Xs) +% end), +% io:format(user, "~.3f sec\n", [HexUSec / 1000000]), +% %% %% file:close(ZZZ), +% +% io:format(user, "Not-sortable file entry enc+dec: ", []), +% [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], +% {NotSortedUSec, _} = +% timer:tc(fun() -> +% lists:foldl(fun(X, _) -> +% B = encode_csum_file_entry(X, 100, CSum), +% decode_csum_file_entry(list_to_binary(B)) +% end, x, Xs) +% end), +% io:format(user, "~.3f sec\n", [NotSortedUSec / 1000000]), +% +% NotHexList = lists:foldl(fun(X, Acc) -> +% B = encode_csum_file_entry(X, 100, CSum), +% [B|Acc] +% end, [], Xs), +% NotHexBin = iolist_to_binary(NotHexList), +% +% io:format(user, "Split NotHexBin: ", []), +% [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], +% {NotHexBinUSec, SplitRes} = +% timer:tc(fun() -> +% put(hack_length, 29), +% put(hack_different, false), +% {Sorted, _Leftover} = split_checksum_list_blob(NotHexBin), +% io:format(user, " Leftover ~p (hack_different ~p) ", [_Leftover, get(hack_different)]), +% Sorted +% end), +% io:format(user, "~.3f sec\n", [NotHexBinUSec / 1000000]), +% +% io:format(user, "Sort Split results: ", []), +% [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], +% {SortSplitUSec, _} = +% timer:tc(fun() -> +% lists:sort(SplitRes) +% %% lists:sort(fun sort_2lines/2, SplitRes) +% end), +% io:format(user, "~.3f sec\n", [SortSplitUSec / 1000000]), +% +% UnsortedName = "/tmp/foo.unsorted", +% SortedName = "/tmp/foo.sorted", +% +% ok = file:write_file(UnsortedName, NotHexList), +% io:format(user, "File Sort Split results: ", []), +% [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], +% {FileSortUSec, _} = +% timer:tc(fun() -> +% {ok, FHin} = file:open(UnsortedName, [read, binary]), +% {ok, FHout} = file:open(SortedName, +% [write, binary, delayed_write]), +% put(hack_sorter_sha_ctx, crypto:hash_init(sha)), +% ok = file_sorter:sort(sort_input_fun(FHin, <<>>), +% sort_output_fun(FHout), +% [{format,binary}, +% {header, 1} +% %% , {order, fun sort_2lines/2} +% ]) +% end), +% io:format(user, "~.3f sec\n", [FileSortUSec / 1000000]), +% _SHA = crypto:hash_final(get(hack_sorter_sha_ctx)), +% %% io:format(user, "SHA via (hack_sorter_sha_ctx) = ~p\n", [_SHA]), +% +% io:format(user, "NotHex-Not-sortable tuple list creation: ", []), +% [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], +% {NotHexTupleCreationUSec, NotHexTupleList} = +% timer:tc(fun() -> +% lists:foldl(fun(X, Acc) -> +% B = encode_csum_file_entry_hex( +% X, 100, CSum), +% [B|Acc] +% end, [], Xs) +% end), +% io:format(user, "~.3f sec\n", [NotHexTupleCreationUSec / 1000000]), +% +% io:format(user, "NotHex-Not-sortable tuple list sort: ", []), +% [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], +% {NotHexTupleSortUSec, _} = +% timer:tc(fun() -> +% lists:sort(NotHexTupleList) +% end), +% io:format(user, "~.3f sec\n", [NotHexTupleSortUSec / 1000000]), +% +% ok. +% +%sort_2lines(<<_:1/binary, A/binary>>, <<_:1/binary, B/binary>>) -> +% A < B. +% +%sort_input_fun(FH, PrevStuff) -> +% fun(close) -> +% ok; +% (read) -> +% case file:read(FH, 1024*1024) of +% {ok, NewStuff} -> +% AllStuff = if PrevStuff == <<>> -> +% NewStuff; +% true -> +% <> +% end, +% {SplitRes, Leftover} = split_checksum_list_blob(AllStuff), +% {SplitRes, sort_input_fun(FH, Leftover)}; +% eof -> +% end_of_input +% end +% end. +% +%sort_output_fun(FH) -> +% fun(close) -> +% file:close(FH); +% (Stuff) -> +% Ctx = get(hack_sorter_sha_ctx), +% put(hack_sorter_sha_ctx, crypto:hash_update(Ctx, Stuff)), +% ok = file:write(FH, Stuff), +% sort_output_fun(FH) +% end. +% +%encode_csum_file_entry_hex(Offset, Size, TaggedCSum) -> +% OffsetHex = machi_util:bin_to_hexstr(<>), +% SizeHex = machi_util:bin_to_hexstr(<>), +% CSumHex = machi_util:bin_to_hexstr(TaggedCSum), +% [OffsetHex, 32, SizeHex, 32, CSumHex]. +% +%decode_csum_file_entry_hex(<>) -> +% Offset = machi_util:hexstr_to_bin(OffsetHex), +% Size = machi_util:hexstr_to_bin(SizeHex), +% CSum = machi_util:hexstr_to_bin(CSumHex), +% {Offset, Size, CSum}. +% +%-endif. % TEST diff --git a/src/machi_flu_filename_mgr.erl b/src/machi_flu_filename_mgr.erl new file mode 100644 index 0000000..c260421 --- /dev/null +++ b/src/machi_flu_filename_mgr.erl @@ -0,0 +1,167 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +%% +%% @doc This process is responsible for managing filenames assigned to +%% prefixes. +%% +%% Supported operations include finding the "current" filename assigned to +%% a prefix. Incrementing the sequence number and returning a new file name +%% and listing all data files assigned to a given prefix. +%% +%% All prefixes should have the form of `{prefix, P}'. Single filename +%% return values have the form of `{file, F}'. +%% +%%

    Finding the current file associated with a sequence

    +%% First it looks up the sequence number from the prefix name. If +%% no sequence file is found, it uses 0 as the sequence number and searches +%% for a matching file with the prefix and 0 as the sequence number. +%% If no file is found, the it generates a new filename by incorporating +%% the given prefix, a randomly generated (v4) UUID and 0 as the +%% sequence number. +%% +%% If the sequence number is > 0, then the process scans the filesystem +%% looking for a filename which matches the prefix and given sequence number and +%% returns that. + +-module(machi_flu_filename_mgr). +-behavior(gen_server). + +-export([ + start_link/1, + find_or_make_filename_from_prefix/1, + increment_prefix_sequence/1, + list_files_by_prefix/1 + ]). + +%% gen_server callbacks +-export([ + init/1, + handle_cast/2, + handle_call/3, + handle_info/2, + terminate/2, + code_change/3 + ]). + +-define(TIMEOUT, 10 * 1000). + +%% public API +start_link(DataDir) when is_list(DataDir) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [DataDir], []). + +-spec find_or_make_filename_from_prefix( Prefix :: {prefix, string()} ) -> + {file, Filename :: string()} | {error, Reason :: term() } | timeout. +% @doc Find the latest available or make a filename from a prefix. A prefix +% should be in the form of a tagged tuple `{prefix, P}'. Returns a tagged +% tuple in the form of `{file, F}' or an `{error, Reason}' +find_or_make_filename_from_prefix({prefix, Prefix}) -> + gen_server:call(?MODULE, {find_filename, Prefix}, ?TIMEOUT); +find_or_make_filename_from_prefix(Other) -> + lager:error("~p is not a valid prefix.", [Other]), + error(badarg). + +-spec increment_prefix_sequence( Prefix :: {prefix, string()} ) -> + ok | {error, Reason :: term() } | timeout. +% @doc Increment the sequence counter for a given prefix. Prefix should +% be in the form of `{prefix, P}'. +increment_prefix_sequence({prefix, Prefix}) -> + gen_server:call(?MODULE, {increment_sequence, Prefix}, ?TIMEOUT); +increment_prefix_sequence(Other) -> + lager:error("~p is not a valid prefix.", [Other]), + error(badarg). + +-spec list_files_by_prefix( Prefix :: {prefix, string()} ) -> + [ file:name() ] | timeout | {error, Reason :: term() }. +% @doc Given a prefix in the form of `{prefix, P}' return +% all the data files associated with that prefix. Returns +% a list. +list_files_by_prefix({prefix, Prefix}) -> + gen_server:call(?MODULE, {list_files, Prefix}, ?TIMEOUT); +list_files_by_prefix(Other) -> + lager:error("~p is not a valid prefix.", [Other]), + error(badarg). + +%% gen_server API +init([DataDir]) -> + {ok, DataDir}. + +handle_cast(Req, State) -> + lager:warning("Got unknown cast ~p", [Req]), + {noreply, State}. + +handle_call({find_filename, Prefix}, _From, DataDir) -> + N = machi_util:read_max_filenum(DataDir, Prefix), + File = case find_file(DataDir, Prefix, N) of + [] -> + {F, _} = machi_util:make_data_filename( + DataDir, + Prefix, + generate_uuid_v4_str(), + N), + F; + [H] -> H; + [Fn | _ ] = L -> + lager:warning( + "Searching for a matching file to prefix ~p and sequence number ~p gave multiples: ~p", + [Prefix, N, L]), + Fn + end, + {reply, {file, File}, DataDir}; +handle_call({increment_sequence, Prefix}, _From, DataDir) -> + ok = machi_util:increment_max_filenum(DataDir, Prefix), + {reply, ok, DataDir}; +handle_call({list_files, Prefix}, From, DataDir) -> + spawn(fun() -> + L = list_files(DataDir, Prefix), + gen_server:reply(From, L) + end), + {noreply, DataDir}; + +handle_call(Req, From, State) -> + lager:warning("Got unknown call ~p from ~p", [Req, From]), + {reply, hoge, State}. + +handle_info(Info, State) -> + lager:warning("Got unknown info ~p", [Info]), + {noreply, State}. + +terminate(Reason, _State) -> + lager:info("Shutting down because ~p", [Reason]), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% private + +%% Quoted from https://github.com/afiskon/erlang-uuid-v4/blob/master/src/uuid.erl +%% MIT License +generate_uuid_v4_str() -> + <> = crypto:strong_rand_bytes(16), + io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b", + [A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]). + +find_file(DataDir, Prefix, N) -> + {_Filename, Path} = machi_util:make_data_filename(DataDir, Prefix, "*", N), + filelib:wildcard(Path). + +list_files(DataDir, Prefix) -> + {F, Path} = machi_util:make_data_filename(DataDir, Prefix, "*", "*"), + filelib:wildcard(F, filename:dirname(Path)). diff --git a/src/machi_flu_listener.erl b/src/machi_flu_listener.erl new file mode 100644 index 0000000..6f45822 --- /dev/null +++ b/src/machi_flu_listener.erl @@ -0,0 +1,190 @@ +% 1. start file proxy supervisor +% 2. start projection store +% 3. start listener +-module(machi_flu_listener). +-behaviour(ranch_protocol). + +-export([start_link/4]). +-export([init/4]). + +-include("machi.hrl"). +-include("machi_pb.hrl"). +-include("machi_projection.hrl"). + +-record(state, { + pb_mode, + high_clnt, + proj_store, + etstab, + epoch_id, + flu_name +}). + +-define(SERVER_CMD_READ_TIMEOUT, 600 * 1000). + +start_link(Ref, Socket, Transport, Opts) -> + Pid = spawn_link(?MODULE, init, [Ref, Socket, Transport, Opts]), + {ok, Pid}. + +init(Ref, Socket, Transport, _Opts = []) -> + ok = ranch:accept_ack(Ref), + %% By default, ranch sets sockets to + %% {active, false}, {packet, raw}, {reuseaddr, true} + ok = Transport:setopts(Socket, ?PB_PACKET_OPTS), + loop(Socket, Transport, #state{}). + +loop(Socket, Transport, S) -> + case Transport:recv(Socket, 0, ?SERVER_CMD_READ_TIMEOUT) of + {ok, Bin} -> + {RespBin, S2} = + case machi_pb:decode_mpb_ll_request(Bin) of + LL_req when LL_req#mpb_ll_request.do_not_alter == 2 -> + {R, NewS} = do_pb_ll_request(LL_req, S), + {maybe_encode_response(R), mode(low, NewS)}; + _ -> + HL_req = machi_pb:decode_mpb_request(Bin), + 1 = HL_req#mpb_request.do_not_alter, + {R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)), + {machi_pb:encode_mpb_response(R), mode(high, NewS)} + end, + if RespBin == async_no_response -> + ok; + true -> + ok = Transport:send(Socket, RespBin) + end, + loop(Socket, Transport, S2); + {error, SockError} -> + lager:error("Socket error ~w", [SockError]), + (catch Transport:close(Socket)) + end. + +make_high_clnt(#state{high_clnt=undefined}=S) -> + {ok, Proj} = machi_projection_store:read_latest_projection( + S#state.proj_store, private), + Ps = [P_srvr || {_, P_srvr} <- orddict:to_list( + Proj#projection_v1.members_dict)], + {ok, Clnt} = machi_cr_client:start_link(Ps), + S#state{high_clnt=Clnt}; +make_high_clnt(S) -> + S. + +maybe_encode_response(async_no_response=X) -> + X; +maybe_encode_response(R) -> + machi_pb:encode_mpb_ll_response(R). + +mode(Mode, #state{pb_mode=undefined}=S) -> + S#state{pb_mode=Mode}; +mode(_, S) -> + S. + +do_pb_ll_request(#mpb_ll_request{req_id=ReqID}, #state{pb_mode=high}=S) -> + Result = {high_error, 41, "Low protocol request while in high mode"}, + {machi_pb_translate:to_pb_response(ReqID, unused, Result), S}; +do_pb_ll_request(PB_request, S) -> + Req = machi_pb_translate:from_pb_request(PB_request), + {ReqID, Cmd, Result, S2} = + case Req of + {RqID, {LowCmd, _}=CMD} + when LowCmd == low_proj; + LowCmd == low_wedge_status; LowCmd == low_list_files -> + %% Skip wedge check for projection commands! + %% Skip wedge check for these unprivileged commands + {Rs, NewS} = do_pb_ll_request3(CMD, S), + {RqID, CMD, Rs, NewS}; + {RqID, CMD} -> + EpochID = element(2, CMD), % by common convention + {Rs, NewS} = do_pb_ll_request2(EpochID, CMD, S), + {RqID, CMD, Rs, NewS} + end, + {machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}. + +do_pb_ll_request2(EpochID, CMD, S) -> + {Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2), + if Wedged_p == true -> + {{error, wedged}, S#state{epoch_id=CurrentEpochID}}; + is_tuple(EpochID) + andalso + EpochID /= CurrentEpochID -> + {Epoch, _} = EpochID, + {CurrentEpoch, _} = CurrentEpochID, + if Epoch < CurrentEpoch -> + ok; + true -> + %% We're at same epoch # but different checksum, or + %% we're at a newer/bigger epoch #. + wedge_myself(S#state.flu_name, CurrentEpochID), + ok + end, + {{error, bad_epoch}, S#state{epoch_id=CurrentEpochID}}; + true -> + do_pb_ll_request3(CMD, S#state{epoch_id=CurrentEpochID}) + end. + +do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) -> + {Msg, S}; +do_pb_ll_request3({low_auth, _BogusEpochID, _User, _Pass}, S) -> + {-6, S}; +do_pb_ll_request3(Cmd, S) -> + {execute_cmd(Cmd), S}. +%do_pb_ll_request3({low_append_chunk, _EpochID, PKey, Prefix, Chunk, CSum_tag, +% CSum, ChunkExtra}, S) -> +% {do_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum, +% ChunkExtra, S), S}; +%do_pb_ll_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag, +% CSum}, S) -> +% {do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, S), S}; +%do_pb_ll_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts}, S) -> +% {do_server_read_chunk(File, Offset, Size, Opts, S), S}; +%do_pb_ll_request3({low_checksum_list, _EpochID, File}, S) -> +% {do_server_checksum_listing(File, S), S}; +%do_pb_ll_request3({low_list_files, _EpochID}, S) -> +% {do_server_list_files(S), S}; +%do_pb_ll_request3({low_wedge_status, _EpochID}, S) -> +% {do_server_wedge_status(S), S}; +%do_pb_ll_request3({low_delete_migration, _EpochID, File}, S) -> +% {do_server_delete_migration(File, S), S}; +%do_pb_ll_request3({low_trunc_hack, _EpochID, File}, S) -> +% {do_server_trunc_hack(File, S), S}; +%do_pb_ll_request3({low_proj, PCMD}, S) -> +% {do_server_proj_request(PCMD, S), S}. +execute_cmd(_Cmd) -> + ok. + + +do_pb_hl_request(#mpb_request{req_id=ReqID}, #state{pb_mode=low}=S) -> + Result = {low_error, 41, "High protocol request while in low mode"}, + {machi_pb_translate:to_pb_response(ReqID, unused, Result), S}; +do_pb_hl_request(PB_request, S) -> + {ReqID, Cmd} = machi_pb_translate:from_pb_request(PB_request), + {Result, S2} = do_pb_hl_request2(Cmd, S), + {machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}. + +do_pb_hl_request2({high_echo, Msg}, S) -> + {Msg, S}; +do_pb_hl_request2({high_auth, _User, _Pass}, S) -> + {-77, S}; +do_pb_hl_request2({high_append_chunk, _todoPK, Prefix, ChunkBin, TaggedCSum, + ChunkExtra}, #state{high_clnt=Clnt}=S) -> + Chunk = {TaggedCSum, ChunkBin}, + Res = machi_cr_client:append_chunk_extra(Clnt, Prefix, Chunk, + ChunkExtra), + {Res, S}; +do_pb_hl_request2({high_write_chunk, File, Offset, ChunkBin, TaggedCSum}, + #state{high_clnt=Clnt}=S) -> + Chunk = {TaggedCSum, ChunkBin}, + Res = machi_cr_client:write_chunk(Clnt, File, Offset, Chunk), + {Res, S}; +do_pb_hl_request2({high_read_chunk, File, Offset, Size}, + #state{high_clnt=Clnt}=S) -> + Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size), + {Res, S}; +do_pb_hl_request2({high_checksum_list, File}, #state{high_clnt=Clnt}=S) -> + Res = machi_cr_client:checksum_list(Clnt, File), + {Res, S}; +do_pb_hl_request2({high_list_files}, #state{high_clnt=Clnt}=S) -> + Res = machi_cr_client:list_files(Clnt), + {Res, S}. + +wedge_myself(_, _) -> ok. + diff --git a/src/machi_flu_manager.erl b/src/machi_flu_manager.erl new file mode 100644 index 0000000..fcad49c --- /dev/null +++ b/src/machi_flu_manager.erl @@ -0,0 +1,149 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(machi_flu_manager). + +-behaviour(gen_server). + +-include("machi_flu.hrl"). %% contains state record + +%% Public API +-export([ + start_link/1, + start/1, + stop/1 +]). + +%% gen_server callbacks +-export([ + init/1, + handle_cast/2, + handle_call/3, + handle_info/2, + terminate/2, + code_change/3 +]). + +%% PUBLIC API + +start_link(S = #state{flu_name = Name}) -> + gen_server:start_link({local, Name}, ?MODULE, [S], []). + +%% TODO Make this a functional thing +start(_FluName) -> + ok. + +%% TODO Make this functional +stop(_) -> ok. + +%% gen_server callbacks +init(S = #state{flu_name = N, epoch_id = EpochId, wedged = W}) -> + Tid = ets:new(make_name(N, "_epoch"), [set, protected, named_table, {read_concurrency, true}]), + true = ets:insert(Tid, {epoch, {W, EpochId}}), + {ok, S#state{etstab=Tid}}. + +handle_cast(Req, S) -> + lager:warning("Unexpected cast ~p", [Req]), + {noreply, S}. + +handle_call(Req, _From, S) -> + lager:warning("Unexpected call ~p", [Req]), + {reply, unexpected, S}. + +handle_info({wedge_myself, _EpochId}, S = #state{wedged = true}) -> + lager:debug("Request to wedge myself, but I'm already wedged. Ignoring."), + {noreply, S}; +handle_info({wedge_myself, EpochId}, S = #state{flu_name = N, + wedged = false, + epoch_id = E, + etstab = Tid}) when EpochId == E -> + true = ets:insert(Tid, {epoch, {true, E}}), + kick_chain_manager(N), + {noreply, S#state{wedged=true}}; + +handle_info({wedge_state_change, Bool, {NewEpoch, _}}, + S = #state{epoch_id = undefined, etstab=Tid}) -> + true = ets:insert(Tid, {epoch, {Bool, NewEpoch}}), + {noreply, S#state{wedged = Bool, epoch_id = NewEpoch}}; +handle_info({wedge_state_change, Bool, {NewEpoch, _}}, + S = #state{epoch_id = E, etstab = Tid}) when NewEpoch >= E -> + true = ets:insert(Tid, {epoch, {Bool, NewEpoch}}), + {noreply, S#state{wedged = Bool, epoch_id = NewEpoch}}; +handle_info(M = {wedge_state_change, _Bool, {NewEpoch, _}}, + S = #state{epoch_id = E}) when NewEpoch < E -> + lager:debug("Wedge state change message ~p, but my epoch id is higher (~p). Ignoring.", + [M, E]), + {noreply, S}; + +handle_info({wedge_status, From}, S = #state{wedged = W, epoch_id = E}) -> + From ! {wedge_status_reply, W, E}, + {noreply, S}; + +handle_info({seq_append, From, _Prefix, _Chunk, _Csum, _Extra, _EpochId}, + S = #state{wedged = true}) -> + From ! wedged, + {noreply, S}; + +handle_info({seq_append, From, Prefix, Chunk, Csum, Extra, EpochId}, + S = #state{epoch_id = EpochId}) -> + handle_append(From, Prefix, Chunk, Csum, Extra), + {noreply, S}; + +handle_info(Info, S) -> + lager:warning("Unexpected info ~p", [Info]), + {noreply, S}. + +terminate(Reason, _S) -> + lager:info("Terminating because ~p", [Reason]), + ok. + +code_change(_Old, S, _Extra) -> + {ok, S}. + +%% private +kick_chain_manager(Name) -> + Chmgr = machi_chain_manager1:make_chmgr_regname(Name), + spawn(fun() -> + catch machi_chain_manager1:trigger_react_to_env(Chmgr) + end). + +handle_append(From, Prefix, Chunk, Csum, Extra) -> + spawn(fun() -> + dispatch_append(From, Prefix, Chunk, Csum, Extra) + end). + +dispatch_append(From, Prefix, Chunk, Csum, Extra) -> + {ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(Prefix), + {Tag, CS} = machi_util:unmake_tagged_csum(Csum), + try + {ok, Filename, Offset} = machi_flu_file_proxy:append(Pid, + [{client_csum_tag, Tag}, {client_csum, CS}], + Extra, Chunk), + From ! {assignment, Offset, Filename}, + exit(normal) + catch + _Type:Reason -> + lager:error("Could not append chunk to prefix ~p because ~p", + [Prefix, Reason]), + exit(Reason) + end. + +make_name(N, Suffix) -> + atom_to_list(N) ++ Suffix. diff --git a/src/machi_flu_metadata_mgr.erl b/src/machi_flu_metadata_mgr.erl new file mode 100644 index 0000000..0fbafa9 --- /dev/null +++ b/src/machi_flu_metadata_mgr.erl @@ -0,0 +1,217 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc This is a metadata service for the machi FLU which currently +%% tracks the mappings between filenames and file proxies. +%% +%% The service takes a given hash space and spreads it out over a +%% pool of N processes which are responsible for 1/Nth the hash +%% space. When a user requests an operation on a particular file +%% the filename is hashed into the hash space and the request +%% forwarded to a particular manager responsible for that slice +%% of the hash space. +%% +%% The current hash implementation is `erlang:phash2/1' which has +%% a range between 0..2^27-1 or 134,217,727. + +-module(machi_flu_metadata_mgr). +-behaviour(gen_server). + + +-define(MAX_MGRS, 10). %% number of managers to start by default. +-define(HASH(X), erlang:phash2(X)). %% hash algorithm to use +-define(TIMEOUT, 10 * 1000). %% 10 second timeout + +-record(state, {name :: atom(), + datadir :: string(), + tid :: ets:tid() + }). + +%% This record goes in the ets table where prefix is the key +-record(md, {filename :: string(), + proxy_pid :: undefined|pid(), + mref :: undefined|reference() %% monitor ref for file proxy + }). + +%% public api +-export([ + start_link/2, + lookup_manager_pid/1, + lookup_proxy_pid/1, + start_proxy_pid/1, + stop_proxy_pid/1 + ]). + +%% gen_server callbacks +-export([ + init/1, + handle_cast/2, + handle_call/3, + handle_info/2, + terminate/2, + code_change/3 + ]). + +%% Public API + +start_link(Name, DataDir) when is_atom(Name) andalso is_list(DataDir) -> + gen_server:start_link({local, Name}, ?MODULE, [Name, DataDir], []). + +lookup_manager_pid({file, Filename}) -> + whereis(get_manager_atom(Filename)). + +lookup_proxy_pid({file, Filename}) -> + gen_server:call(get_manager_atom(Filename), {proxy_pid, Filename}, ?TIMEOUT). + +start_proxy_pid({file, Filename}) -> + gen_server:call(get_manager_atom(Filename), {start_proxy_pid, Filename}, ?TIMEOUT). + +stop_proxy_pid({file, Filename}) -> + gen_server:call(get_manager_atom(Filename), {stop_proxy_pid, Filename}, ?TIMEOUT). + +%% gen_server callbacks +init([Name, DataDir]) -> + Tid = ets:new(Name, [{keypos, 2}, {read_concurrency, true}, {write_concurrency, true}]), + {ok, #state{ name = Name, datadir = DataDir, tid = Tid}}. + +handle_cast(Req, State) -> + lager:warning("Got unknown cast ~p", [Req]), + {noreply, State}. + +handle_call({proxy_pid, Filename}, _From, State = #state{ tid = Tid }) -> + Reply = case lookup_md(Tid, Filename) of + not_found -> undefined; + R -> R#md.proxy_pid + end, + {reply, Reply, State}; + +handle_call({start_proxy_pid, Filename}, _From, State = #state{ tid = Tid, datadir = D }) -> + NewR = case lookup_md(Tid, Filename) of + not_found -> + start_file_proxy(D, Filename); + #md{ proxy_pid = undefined } = R0 -> + start_file_proxy(D, R0); + #md{ proxy_pid = _Pid } = R1 -> + R1 + end, + update_ets(Tid, NewR), + {reply, {ok, NewR#md.proxy_pid}, State}; +handle_call({stop_proxy_pid, Filename}, _From, State = #state{ tid = Tid }) -> + case lookup_md(Tid, Filename) of + not_found -> + ok; + #md{ proxy_pid = undefined } -> + ok; + #md{ proxy_pid = Pid, mref = M } = R -> + demonitor(M, [flush]), + machi_file_proxy:stop(Pid), + update_ets(Tid, R#md{ proxy_pid = undefined, mref = undefined }) + end, + {reply, ok, State}; + +handle_call(Req, From, State) -> + lager:warning("Got unknown call ~p from ~p", [Req, From]), + {reply, hoge, State}. + +handle_info({'DOWN', Mref, process, Pid, normal}, State = #state{ tid = Tid }) -> + lager:debug("file proxy ~p shutdown normally", [Pid]), + clear_ets(Tid, Mref), + {noreply, State}; + +handle_info({'DOWN', Mref, process, Pid, file_rollover}, State = #state{ tid = Tid }) -> + lager:info("file proxy ~p shutdown because of file rollover", [Pid]), + R = get_md_record_by_mref(Tid, Mref), + [Prefix | _Rest] = machi_util:parse_filename({file, R#md.filename}), + + %% We only increment the counter here. The filename will be generated on the + %% next append request to that prefix and since the filename will have a new + %% sequence number it probably will be associated with a different metadata + %% manager. That's why we don't want to generate a new file name immediately + %% and use it to start a new file proxy. + ok = machi_flu_filename_mgr:increment_prefix_sequence({prefix, Prefix}), + + %% purge our ets table of this entry completely since it is likely the + %% new filename (whenever it comes) will be in a different manager than + %% us. + purge_ets(Tid, R), + {noreply, State}; + +handle_info({'DOWN', Mref, process, Pid, wedged}, State = #state{ tid = Tid }) -> + lager:error("file proxy ~p shutdown because it's wedged", [Pid]), + clear_ets(Tid, Mref), + {noreply, State}; +handle_info({'DOWN', Mref, process, Pid, Error}, State = #state{ tid = Tid }) -> + lager:error("file proxy ~p shutdown because ~p", [Pid, Error]), + clear_ets(Tid, Mref), + {noreply, State}; + + +handle_info(Info, State) -> + lager:warning("Got unknown info ~p", [Info]), + {noreply, State}. + +terminate(Reason, _State) -> + lager:info("Shutting down because ~p", [Reason]), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% Private functions + +compute_hash(Data) -> + ?HASH(Data). + +compute_worker(Hash) -> + Hash rem ?MAX_MGRS. + +build_metadata_mgr_name(N) when is_integer(N) -> + list_to_atom("machi_flu_metadata_mgr_" ++ integer_to_list(N)). + +get_manager_atom(Data) -> + build_metadata_mgr_name(compute_worker(compute_hash(Data))). + +lookup_md(Tid, Data) -> + case ets:lookup(Tid, Data) of + [] -> not_found; + [R] -> R + end. + +start_file_proxy(D, R = #md{filename = F} ) -> + {ok, Pid} = machi_file_proxy_sup:start_proxy(D, F), + Mref = monitor(process, Pid), + R#md{ proxy_pid = Pid, mref = Mref }; + +start_file_proxy(D, Filename) -> + start_file_proxy(D, #md{ filename = Filename }). + +update_ets(Tid, R) -> + ets:insert(Tid, R). + +clear_ets(Tid, Mref) -> + R = get_md_record_by_mref(Tid, Mref), + update_ets(Tid, R#md{ proxy_pid = undefined, mref = undefined }). + +purge_ets(Tid, R) -> + ok = ets:delete_object(Tid, R). + +get_md_record_by_mref(Tid, Mref) -> + [R] = ets:match_object(Tid, {md, '_', '_', Mref}), + R. diff --git a/src/machi_util.erl b/src/machi_util.erl index b330dc6..30bf1b7 100644 --- a/src/machi_util.erl +++ b/src/machi_util.erl @@ -36,7 +36,7 @@ make_projection_filename/2, read_max_filenum/2, increment_max_filenum/2, info_msg/2, verb/1, verb/2, - mbytes/1, + mbytes/1, parse_filename/1, %% TCP protocol helpers connect/2, connect/3, %% List twiddling @@ -89,8 +89,7 @@ make_checksum_filename(DataDir, FileName) -> -spec make_data_filename(string(), string(), atom()|string()|binary(), integer()) -> {binary(), string()}. make_data_filename(DataDir, Prefix, SequencerName, FileNum) -> - File = erlang:iolist_to_binary(io_lib:format("~s.~s.~w", - [Prefix, SequencerName, FileNum])), + File = erlang:iolist_to_binary(string:join([Prefix, SequencerName, integer_to_list(FileNum)], ?FN_DELIMITER)), FullPath = lists:flatten(io_lib:format("~s/data/~s", [DataDir, File])), {File, FullPath}. @@ -262,6 +261,10 @@ mbytes(0) -> mbytes(Size) -> lists:flatten(io_lib:format("~.1.0f", [max(0.1, Size / (1024*1024))])). +-spec parse_filename( Filename :: {file, string()} ) -> [ string() ]. +parse_filename({file, F}) -> + string:tokens(F, ?FN_DELIMITER). + %% @doc Log an 'info' level message. -spec info_msg(string(), list()) -> term(). diff --git a/test/machi_file_proxy_eqc.erl b/test/machi_file_proxy_eqc.erl new file mode 100644 index 0000000..39b0852 --- /dev/null +++ b/test/machi_file_proxy_eqc.erl @@ -0,0 +1,328 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(machi_file_proxy_eqc). + +-ifdef(TEST). +-ifdef(EQC). +-compile(export_all). +-include("machi.hrl"). +-include_lib("eqc/include/eqc.hrl"). +-include_lib("eqc/include/eqc_statem.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(QC_OUT(P), + eqc:on_output(fun(Str, Args) -> io:format(user, Str, Args) end, P)). + + +%% EUNIT TEST DEFINITION +eqc_test_() -> + {timeout, 60, + {spawn, + [ + {timeout, 30, ?_assertEqual(true, eqc:quickcheck(eqc:testing_time(15, ?QC_OUT(prop_ok()))))} + ] + }}. + +%% SHELL HELPERS +test() -> + test(100). + +test(N) -> + quickcheck(numtests(N, prop_ok())). + +check() -> + check(prop_ok(), current_counterexample()). + +%% GENERATORS + +csum_type() -> + elements([?CSUM_TAG_NONE, ?CSUM_TAG_CLIENT_SHA, ?CSUM_TAG_SERVER_SHA]). + +csum(Type, Binary) -> + case Type of + ?CSUM_TAG_NONE -> <<>>; + _ -> machi_util:checksum_chunk(Binary) + end. + +position(P) -> + ?LET(O, offset(), P + O). + +offset() -> + ?SUCHTHAT(X, int(), X >= 0). + +offset_base() -> + elements([4096, 6144, 7168, 8192, 20480, 100000, 1000000]). + +big_offset() -> + ?LET(P, int(), ?LET(X, offset_base(), P+X)). + +len() -> + ?SUCHTHAT(X, int(), X >= 1). + +data_with_csum() -> + ?LET({B,T},{eqc_gen:largebinary(), csum_type()}, {B,T, csum(T, B)}). + %?LET({B,T},{eqc_gen:binary(), csum_type()}, {B,T, csum(T, B)}). + +data_with_csum(Limit) -> + %?LET({B,T},{?LET(S, Limit, eqc_gen:largebinary(S)), csum_type()}, {B,T, csum(T, B)}). + ?LET({B,T},{?LET(S, Limit, eqc_gen:binary(S)), csum_type()}, {B,T, csum(T, B)}). + +intervals([]) -> + []; +intervals([N]) -> + [{N, choose(1,150)}]; +intervals([A,B|T]) -> + [{A, choose(1, B-A)}|intervals([B|T])]. + +interval_list() -> + ?LET(L, list(choose(1024, 4096)), intervals(lists:usort(L))). + +shuffle_interval() -> + ?LET(L, interval_list(), shuffle(L)). + +get_written_interval(L) -> + ?LET({O, Ln}, elements(L), {O+1, Ln-1}). + +%% INITIALIZATION + +-record(state, {pid, prev_extra = 0, planned_writes=[], written=[]}). + +initial_state() -> #state{written=[{0,1024}]}. +initial_state(I) -> #state{written=[{0,1024}], planned_writes=I}. + +weight(_S, rewrite) -> 1; +weight(_S, _) -> 2. + +%% HELPERS + +%% check if an operation is permitted based on whether a write has +%% occurred +check_writes(_Op, [], _Off, _L) -> + false; +check_writes(_Op, [{Pos, Sz}|_T], Off, L) when Pos == Off + andalso Sz == L -> + mostly_true; +check_writes(read, [{Pos, Sz}|_T], Off, L) when Off >= Pos + andalso Off < (Pos + Sz) + andalso Sz >= ( L - ( Off - Pos ) ) -> + true; +check_writes(write, [{Pos, Sz}|_T], Off, L) when ( Off + L ) > Pos + andalso Off < (Pos + Sz) -> + true; +check_writes(Op, [_H|T], Off, L) -> + check_writes(Op, T, Off, L). + +is_error({error, _}) -> true; +is_error({error, _, _}) -> true; +is_error(Other) -> {expected_ERROR, Other}. + +probably_error(ok) -> true; +probably_error(V) -> is_error(V). + +is_ok({ok, _, _}) -> true; +is_ok(ok) -> true; +is_ok(Other) -> {expected_OK, Other}. + +get_offset({ok, _Filename, Offset}) -> Offset; +get_offset(_) -> error(badarg). + +offset_valid(Offset, Extra, L) -> + {Pos, Sz} = lists:last(L), + Offset == Pos + Sz + Extra. + +-define(TESTDIR, "./eqc"). + +cleanup() -> + [begin + Fs = filelib:wildcard(?TESTDIR ++ Glob), + [file:delete(F) || F <- Fs], + [file:del_dir(F) || F <- Fs] + end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ], + _ = file:del_dir(?TESTDIR), + ok. + +%% start + +start_pre(S) -> + S#state.pid == undefined. + +start_command(S) -> + {call, ?MODULE, start, [S]}. + +start(_S) -> + {_, _, MS} = os:timestamp(), + File = test_server:temp_name("eqc_data") ++ "." ++ integer_to_list(MS), + {ok, Pid} = machi_file_proxy:start_link(File, ?TESTDIR), + unlink(Pid), + Pid. + +start_next(S, Pid, _Args) -> + S#state{pid = Pid}. + +%% read + +read_pre(S) -> + S#state.pid /= undefined. + +read_args(S) -> + [S#state.pid, offset(), len()]. + +read_ok(S, Off, L) -> + case S#state.written of + [{0, 1024}] -> false; + W -> check_writes(read, W, Off, L) + end. + +read_post(S, [_Pid, Off, L], Res) -> + case read_ok(S, Off, L) of + true -> is_ok(Res); + mostly_true -> is_ok(Res); + false -> is_error(Res) + end. + +read_next(S, _Res, _Args) -> S. + +read(Pid, Offset, Length) -> + machi_file_proxy:read(Pid, Offset, Length). + +%% write + +write_pre(S) -> + S#state.pid /= undefined andalso S#state.planned_writes /= []. + +%% do not allow writes with empty data +write_pre(_S, [_Pid, _Extra, {<<>>, _Tag, _Csum}]) -> + false; +write_pre(_S, _Args) -> + true. + +write_args(S) -> + {Off, Len} = hd(S#state.planned_writes), + [S#state.pid, Off, data_with_csum(Len)]. + +write_ok(_S, [_Pid, Off, _Data]) when Off < 1024 -> false; +write_ok(S, [_Pid, Off, {Bin, _Tag, _Csum}]) -> + Size = iolist_size(Bin), + %% Check writes checks if a byte range is *written* + %% So writes are ok IFF they are NOT written, so + %% we want not check_writes/3 to be true. + check_writes(write, S#state.written, Off, Size). + +write_post(S, Args, Res) -> + case write_ok(S, Args) of + %% false means this range has NOT been written before, so + %% it should succeed + false -> eq(Res, ok); + %% mostly true means we've written this range before BUT + %% as a special case if we get a call to write the EXACT + %% same data that's already on the disk, we return "ok" + %% instead of {error, written}. + mostly_true -> probably_error(Res); + %% If we get true, then we've already written this section + %% or a portion of this range to disk and should return an + %% error. + true -> is_error(Res) + end. + +write_next(S, Res, [_Pid, Offset, {Bin, _Tag, _Csum}]) -> + S0 = case is_ok(Res) of + true -> + S#state{written = lists:sort(S#state.written ++ [{Offset, iolist_size(Bin)}]) }; + _ -> + S + end, + S0#state{prev_extra = 0, planned_writes=tl(S0#state.planned_writes)}. + + +write(Pid, Offset, {Bin, Tag, Csum}) -> + Meta = [{client_csum_tag, Tag}, + {client_csum, Csum}], + machi_file_proxy:write(Pid, Offset, Meta, Bin). + +%% append + +append_pre(S) -> + S#state.pid /= undefined. + +%% do not allow appends with empty binary data +append_pre(_S, [_Pid, _Extra, {<<>>, _Tag, _Csum}]) -> + false; +append_pre(_S, _Args) -> + true. + +append_args(S) -> + [S#state.pid, default(0, len()), data_with_csum()]. + +append(Pid, Extra, {Bin, Tag, Csum}) -> + Meta = [{client_csum_tag, Tag}, + {client_csum, Csum}], + machi_file_proxy:append(Pid, Meta, Extra, Bin). + +append_next(S, Res, [_Pid, Extra, {Bin, _Tag, _Csum}]) -> + case is_ok(Res) of + true -> + Offset = get_offset(Res), + true = offset_valid(Offset, S#state.prev_extra, S#state.written), + S#state{prev_extra = Extra, written = lists:sort(S#state.written ++ [{Offset, iolist_size(Bin)}])}; + _ -> + S + end. + +%% appends should always succeed unless the disk is full +%% or there's a hardware failure. +append_post(_S, _Args, Res) -> + true == is_ok(Res). + +%% rewrite + +rewrite_pre(S) -> + S#state.pid /= undefined andalso S#state.written /= []. + +rewrite_args(S) -> + ?LET({Off, Len}, get_written_interval(S#state.written), + [S#state.pid, Off, data_with_csum(Len)]). + +rewrite(Pid, Offset, {Bin, Tag, Csum}) -> + Meta = [{client_csum_tag, Tag}, + {client_csum, Csum}], + machi_file_proxy:write(Pid, Offset, Meta, Bin). + +rewrite_post(_S, _Args, Res) -> + is_error(Res). + +rewrite_next(S, _Res, _Args) -> + S#state{prev_extra = 0}. + +%% Property + +prop_ok() -> + cleanup(), + ?FORALL(I, shuffle_interval(), + ?FORALL(Cmds, parallel_commands(?MODULE, initial_state(I)), + begin + {H, S, Res} = run_parallel_commands(?MODULE, Cmds), + pretty_commands(?MODULE, Cmds, {H, S, Res}, + aggregate(command_names(Cmds), Res == ok)) + end) + ). + +-endif. % EQC +-endif. % TEST diff --git a/test/machi_file_proxy_test.erl b/test/machi_file_proxy_test.erl new file mode 100644 index 0000000..bcb0085 --- /dev/null +++ b/test/machi_file_proxy_test.erl @@ -0,0 +1,103 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(machi_file_proxy_test). + +-ifdef(TEST). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include("machi.hrl"). + +clean_up_data_dir(DataDir) -> + [begin + Fs = filelib:wildcard(DataDir ++ Glob), + [file:delete(F) || F <- Fs], + [file:del_dir(F) || F <- Fs] + end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ], + _ = file:del_dir(DataDir), + ok. + +-ifndef(PULSE). + +-define(TESTDIR, "./t"). +-define(HYOOGE, 1 * 1024 * 1024 * 1024). % 1 long GB + +random_binary_single() -> + %% OK, I guess it's not that random... + <<"Four score and seven years ago our fathers brought forth on this + continent a new nation, conceived in liberty, and dedicated to the + proposition that all men are created equal. + + Now we are engaged in a great civil war, testing whether that nation, or any + nation so conceived and so dedicated, can long endure. We are met on a great + battlefield of that war. We have come to dedicate a portion of that field, as a + final resting place for those who here gave their lives that that nation + might live. It is altogether fitting and proper that we should do this. + + But, in a larger sense, we can not dedicate, we can not consecrate, we can not + hallow this ground. The brave men, living and dead, who struggled here, have + consecrated it, far above our poor power to add or detract. The world will + little note, nor long remember what we say here, but it can never forget what + they did here. It is for us the living, rather, to be dedicated here to the + unfinished work which they who fought here have thus far so nobly advanced. It + is rather for us to be here dedicated to the great task remaining before us— + that from these honored dead we take increased devotion to that cause for which + they gave the last full measure of devotion— that we here highly resolve that + these dead shall not have died in vain— that this nation, under God, shall have + a new birth of freedom— and that government of the people, by the people, for + the people, shall not perish from the earth.">>. + +random_binary(Start, End) -> + Size = byte_size(random_binary_single()) - 1, + case End > Size of + true -> + Copies = ( End div Size ) + 1, + D0 = binary:copy(random_binary_single(), Copies), + binary:part(<>, Start, End); + false -> + binary:part(random_binary_single(), Start, End) + end. + +machi_file_proxy_test_() -> + clean_up_data_dir(?TESTDIR), + {ok, Pid} = machi_file_proxy:start_link("test", ?TESTDIR), + [ + ?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)), + ?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)), + ?_assertEqual({error, bad_arg}, machi_file_proxy:append(Pid, [], -1, <<"krep">>)), + ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1)), + ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, 1)), + ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1024)), + ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)), + ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)), + ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE))), + ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))), + ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)), + ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)), + ?_assertMatch({ok, _, _}, machi_file_proxy:read(Pid, 1025, 1000)), + ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)), + ?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))), + ?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid)) + ]. + +-endif. % !PULSE +-endif. % TEST. +