Enforce write-once property #1
13 changed files with 2565 additions and 1159 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -7,6 +7,8 @@ erl_crash.dump
|
||||||
.concrete/DEV_MODE
|
.concrete/DEV_MODE
|
||||||
.rebar
|
.rebar
|
||||||
edoc
|
edoc
|
||||||
|
# ignore vim swap files
|
||||||
|
*.swp
|
||||||
|
|
||||||
# PB artifacts for Erlang
|
# PB artifacts for Erlang
|
||||||
include/machi_pb.hrl
|
include/machi_pb.hrl
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
%% -define(DATA_DIR, "/Volumes/SAM1/seq-tests/data").
|
%% -define(DATA_DIR, "/Volumes/SAM1/seq-tests/data").
|
||||||
-define(DATA_DIR, "./data").
|
-define(DATA_DIR, "./data").
|
||||||
-define(MINIMUM_OFFSET, 1024).
|
-define(MINIMUM_OFFSET, 1024).
|
||||||
|
-define(FN_DELIMITER, "^").
|
||||||
|
|
||||||
%% 0th draft of checksum typing with 1st byte.
|
%% 0th draft of checksum typing with 1st byte.
|
||||||
-define(CSUM_TAG_NONE, 0). % No csum provided by client
|
-define(CSUM_TAG_NONE, 0). % No csum provided by client
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
|
|
||||||
{deps, [
|
{deps, [
|
||||||
{lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "2.0.1"}}},
|
{lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "2.0.1"}}},
|
||||||
{protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", {tag, "0.8.1p4"}}}
|
{protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", {tag, "0.8.1p4"}}},
|
||||||
|
{ranch, ".*", {git, "git://github.com/ninenines/ranch.git", {tag, "1.1.0"}}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
|
778
src/machi_file_proxy.erl
Normal file
778
src/machi_file_proxy.erl
Normal file
|
@ -0,0 +1,778 @@
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%%
|
||||||
|
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% This file is provided to you under the Apache License,
|
||||||
|
%% Version 2.0 (the "License"); you may not use this file
|
||||||
|
%% except in compliance with the License. You may obtain
|
||||||
|
%% a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing,
|
||||||
|
%% software distributed under the License is distributed on an
|
||||||
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
%% KIND, either express or implied. See the License for the
|
||||||
|
%% specific language governing permissions and limitations
|
||||||
|
%% under the License.
|
||||||
|
%%
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc This is a proxy process which mediates access to Machi FLU
|
||||||
|
%% controlled files. In particular, it manages the "write-once register"
|
||||||
|
%% conceit at the heart of Machi's design.
|
||||||
|
%%
|
||||||
|
%% Read, write and append requests for a single file will be managed
|
||||||
|
%% through this proxy. Clients can also request syncs for specific
|
||||||
|
%% types of filehandles.
|
||||||
|
%%
|
||||||
|
%% As operations are requested, the proxy keeps track of how many
|
||||||
|
%% operations it has performed (and how many errors were generated.)
|
||||||
|
%% After a sufficient number of inactivity, the server terminates
|
||||||
|
%% itself.
|
||||||
|
%%
|
||||||
|
%% TODO:
|
||||||
|
%% 1. Some way to transition the proxy into a wedged state that
|
||||||
|
%% doesn't rely on message delivery.
|
||||||
|
%%
|
||||||
|
%% 2. Check max file size on appends. Writes we take on faith we can
|
||||||
|
%% and should handle.
|
||||||
|
%%
|
||||||
|
%% 3. Async checksum reads on startup.
|
||||||
|
|
||||||
|
-module(machi_file_proxy).
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
-include("machi.hrl").
|
||||||
|
|
||||||
|
%% public API
|
||||||
|
-export([
|
||||||
|
start_link/2,
|
||||||
|
stop/1,
|
||||||
|
sync/1,
|
||||||
|
sync/2,
|
||||||
|
read/3,
|
||||||
|
write/3,
|
||||||
|
write/4,
|
||||||
|
append/2,
|
||||||
|
append/4
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% gen_server callbacks
|
||||||
|
-export([
|
||||||
|
init/1,
|
||||||
|
handle_call/3,
|
||||||
|
handle_cast/2,
|
||||||
|
handle_info/2,
|
||||||
|
terminate/2,
|
||||||
|
code_change/3
|
||||||
|
]).
|
||||||
|
|
||||||
|
-define(TICK, 30*1000). %% XXX FIXME Should be something like 5 seconds
|
||||||
|
-define(TICK_THRESHOLD, 5). %% After this + 1 more quiescent ticks, shutdown
|
||||||
|
-define(TIMEOUT, 10*1000).
|
||||||
|
-define(TOO_MANY_ERRORS_RATIO, 50).
|
||||||
|
|
||||||
|
-type op_stats() :: { Total :: non_neg_integer(),
|
||||||
|
Errors :: non_neg_integer() }.
|
||||||
|
|
||||||
|
-type byte_sequence() :: { Offset :: non_neg_integer(),
|
||||||
|
Size :: pos_integer()|infinity }.
|
||||||
|
|
||||||
|
-record(state, {
|
||||||
|
data_dir :: string() | undefined,
|
||||||
|
filename :: string() | undefined,
|
||||||
|
data_path :: string() | undefined,
|
||||||
|
wedged = false :: boolean(),
|
||||||
|
csum_file :: string()|undefined,
|
||||||
|
csum_path :: string()|undefined,
|
||||||
|
eof_position = 0 :: non_neg_integer(),
|
||||||
|
unwritten_bytes = [] :: [byte_sequence()],
|
||||||
|
data_filehandle :: file:filehandle(),
|
||||||
|
csum_filehandle :: file:filehandle(),
|
||||||
|
tref :: reference(), %% timer ref
|
||||||
|
ticks = 0 :: non_neg_integer(), %% ticks elapsed with no new operations
|
||||||
|
ops = 0 :: non_neg_integer(), %% sum of all ops
|
||||||
|
reads = {0, 0} :: op_stats(),
|
||||||
|
writes = {0, 0} :: op_stats(),
|
||||||
|
appends = {0, 0} :: op_stats()
|
||||||
|
}).
|
||||||
|
|
||||||
|
%% Public API
|
||||||
|
|
||||||
|
% @doc Start a new instance of the file proxy service. Takes the filename
|
||||||
|
% and data directory as arguments. This function is typically called by the
|
||||||
|
% `machi_file_proxy_sup:start_proxy/2' function.
|
||||||
|
-spec start_link(Filename :: string(), DataDir :: string()) -> any().
|
||||||
|
start_link(Filename, DataDir) ->
|
||||||
|
gen_server:start_link(?MODULE, {Filename, DataDir}, []).
|
||||||
|
|
||||||
|
% @doc Request to stop an instance of the file proxy service.
|
||||||
|
-spec stop(Pid :: pid()) -> ok.
|
||||||
|
stop(Pid) when is_pid(Pid) ->
|
||||||
|
gen_server:call(Pid, {stop}, ?TIMEOUT).
|
||||||
|
|
||||||
|
% @doc Force a sync of all filehandles
|
||||||
|
-spec sync(Pid :: pid()) -> ok|{error, term()}.
|
||||||
|
sync(Pid) when is_pid(Pid) ->
|
||||||
|
sync(Pid, all);
|
||||||
|
sync(_Pid) ->
|
||||||
|
lager:warning("Bad pid to sync"),
|
||||||
|
{error, bad_arg}.
|
||||||
|
|
||||||
|
% @doc Force a sync of a specific filehandle type. Valid types are `all', `csum' and `data'.
|
||||||
|
-spec sync(Pid :: pid(), Type :: all|data|csum) -> ok|{error, term()}.
|
||||||
|
sync(Pid, Type) when is_pid(Pid) andalso
|
||||||
|
( Type =:= all orelse Type =:= csum orelse Type =:= data ) ->
|
||||||
|
gen_server:call(Pid, {sync, Type}, ?TIMEOUT);
|
||||||
|
sync(_Pid, Type) ->
|
||||||
|
lager:warning("Bad arg to sync: Type ~p", [Type]),
|
||||||
|
{error, bad_arg}.
|
||||||
|
|
||||||
|
% @doc Read file at offset for length
|
||||||
|
-spec read(Pid :: pid(),
|
||||||
|
Offset :: non_neg_integer(),
|
||||||
|
Length :: non_neg_integer()) -> {ok, Data :: binary(), Checksum :: binary()} |
|
||||||
|
{error, Reason :: term()}.
|
||||||
|
read(Pid, Offset, Length) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
|
||||||
|
andalso is_integer(Length) andalso Length > 0 ->
|
||||||
|
gen_server:call(Pid, {read, Offset, Length}, ?TIMEOUT);
|
||||||
|
read(_Pid, Offset, Length) ->
|
||||||
|
lager:warning("Bad args to read: Offset ~p, Length ~p", [Offset, Length]),
|
||||||
|
{error, bad_arg}.
|
||||||
|
|
||||||
|
% @doc Write data at offset
|
||||||
|
-spec write(Pid :: pid(), Offset :: non_neg_integer(), Data :: binary()) -> ok|{error, term()}.
|
||||||
|
write(Pid, Offset, Data) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
|
||||||
|
andalso is_binary(Data) ->
|
||||||
|
write(Pid, Offset, [], Data);
|
||||||
|
write(_Pid, Offset, _Data) ->
|
||||||
|
lager:warning("Bad arg to write: Offset ~p", [Offset]),
|
||||||
|
{error, bad_arg}.
|
||||||
|
|
||||||
|
% @doc Write data at offset, including the client metadata. ClientMeta is a proplist
|
||||||
|
% that expects the following keys and values:
|
||||||
|
% <ul>
|
||||||
|
% <li>`client_csum_tag' - the type of checksum from the client as defined in the machi.hrl file</li>
|
||||||
|
% <li>`client_csum' - the checksum value from the client</li>
|
||||||
|
% </ul>
|
||||||
|
-spec write(Pid :: pid(), Offset :: non_neg_integer(), ClientMeta :: proplists:proplist(),
|
||||||
|
Data :: binary()) -> ok|{error, term()}.
|
||||||
|
write(Pid, Offset, ClientMeta, Data) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
|
||||||
|
andalso is_list(ClientMeta) andalso is_binary(Data) ->
|
||||||
|
gen_server:call(Pid, {write, Offset, ClientMeta, Data}, ?TIMEOUT);
|
||||||
|
write(_Pid, Offset, ClientMeta, _Data) ->
|
||||||
|
lager:warning("Bad arg to write: Offset ~p, ClientMeta: ~p", [Offset, ClientMeta]),
|
||||||
|
{error, bad_arg}.
|
||||||
|
|
||||||
|
% @doc Append data
|
||||||
|
-spec append(Pid :: pid(), Data :: binary()) -> {ok, File :: string(), Offset :: non_neg_integer()}
|
||||||
|
|{error, term()}.
|
||||||
|
append(Pid, Data) when is_pid(Pid) andalso is_binary(Data) ->
|
||||||
|
append(Pid, [], 0, Data);
|
||||||
|
append(_Pid, _Data) ->
|
||||||
|
lager:warning("Bad arguments to append/2"),
|
||||||
|
{error, bad_arg}.
|
||||||
|
|
||||||
|
% @doc Append data to file, supplying client metadata and (if desired) a
|
||||||
|
% reservation for additional space. ClientMeta is a proplist and expects the
|
||||||
|
% same keys as write/4.
|
||||||
|
-spec append(Pid :: pid(), ClientMeta :: proplists:proplist(),
|
||||||
|
Extra :: non_neg_integer(), Data :: binary()) -> {ok, File :: string(), Offset :: non_neg_integer()}
|
||||||
|
|{error, term()}.
|
||||||
|
append(Pid, ClientMeta, Extra, Data) when is_pid(Pid) andalso is_list(ClientMeta)
|
||||||
|
andalso is_integer(Extra) andalso Extra >= 0
|
||||||
|
andalso is_binary(Data) ->
|
||||||
|
gen_server:call(Pid, {append, ClientMeta, Extra, Data}, ?TIMEOUT);
|
||||||
|
append(_Pid, ClientMeta, Extra, _Data) ->
|
||||||
|
lager:warning("Bad arg to append: ClientMeta ~p, Extra ~p", [ClientMeta, Extra]),
|
||||||
|
{error, bad_arg}.
|
||||||
|
|
||||||
|
%% gen_server callbacks
|
||||||
|
|
||||||
|
% @private
|
||||||
|
init({Filename, DataDir}) ->
|
||||||
|
CsumFile = machi_util:make_checksum_filename(DataDir, Filename),
|
||||||
|
{_, DPath} = machi_util:make_data_filename(DataDir, Filename),
|
||||||
|
ok = filelib:ensure_dir(CsumFile),
|
||||||
|
ok = filelib:ensure_dir(DPath),
|
||||||
|
UnwrittenBytes = parse_csum_file(CsumFile),
|
||||||
|
{Eof, infinity} = lists:last(UnwrittenBytes),
|
||||||
|
{ok, FHd} = file:open(DPath, [read, write, binary, raw]),
|
||||||
|
{ok, FHc} = file:open(CsumFile, [append, binary, raw]),
|
||||||
|
Tref = schedule_tick(),
|
||||||
|
St = #state{
|
||||||
|
filename = Filename,
|
||||||
|
data_dir = DataDir,
|
||||||
|
data_path = DPath,
|
||||||
|
csum_file = CsumFile,
|
||||||
|
data_filehandle = FHd,
|
||||||
|
csum_filehandle = FHc,
|
||||||
|
tref = Tref,
|
||||||
|
unwritten_bytes = UnwrittenBytes,
|
||||||
|
eof_position = Eof},
|
||||||
|
lager:debug("Starting file proxy ~p for filename ~p, state = ~p",
|
||||||
|
[self(), Filename, St]),
|
||||||
|
{ok, St}.
|
||||||
|
|
||||||
|
% @private
|
||||||
|
handle_call({stop}, _From, State) ->
|
||||||
|
lager:debug("Requested to stop."),
|
||||||
|
{stop, normal, State};
|
||||||
|
|
||||||
|
handle_call({sync, data}, _From, State = #state{ data_filehandle = FHd }) ->
|
||||||
|
R = file:sync(FHd),
|
||||||
|
{reply, R, State};
|
||||||
|
|
||||||
|
handle_call({sync, csum}, _From, State = #state{ csum_filehandle = FHc }) ->
|
||||||
|
R = file:sync(FHc),
|
||||||
|
{reply, R, State};
|
||||||
|
|
||||||
|
handle_call({sync, all}, _From, State = #state{filename = F,
|
||||||
|
data_filehandle = FHd,
|
||||||
|
csum_filehandle = FHc
|
||||||
|
}) ->
|
||||||
|
R = file:sync(FHc),
|
||||||
|
R1 = file:sync(FHd),
|
||||||
|
Resp = case {R, R1} of
|
||||||
|
{ok, ok} -> ok;
|
||||||
|
{ok, O1} ->
|
||||||
|
lager:error("Got ~p during a data file sync on file ~p", [O1, F]),
|
||||||
|
O1;
|
||||||
|
{O2, ok} ->
|
||||||
|
lager:error("Got ~p during a csum file sync on file ~p", [O2, F]),
|
||||||
|
O2;
|
||||||
|
{O3, O4} ->
|
||||||
|
lager:error("Got ~p ~p syncing all files for file ~p", [O3, O4, F]),
|
||||||
|
{O3, O4}
|
||||||
|
end,
|
||||||
|
{reply, Resp, State};
|
||||||
|
|
||||||
|
%%% READS
|
||||||
|
|
||||||
|
handle_call({read, _Offset, _Length}, _From,
|
||||||
|
State = #state{wedged = true,
|
||||||
|
reads = {T, Err}
|
||||||
|
}) ->
|
||||||
|
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};
|
||||||
|
|
||||||
|
handle_call({read, Offset, Length}, _From,
|
||||||
|
State = #state{eof_position = Eof,
|
||||||
|
reads = {T, Err}
|
||||||
|
}) when Offset + Length > Eof ->
|
||||||
|
lager:error("Read request at offset ~p for ~p bytes is past the last write offset of ~p",
|
||||||
|
[Offset, Length, Eof]),
|
||||||
|
{reply, {error, not_written}, State#state{reads = {T + 1, Err + 1}}};
|
||||||
|
|
||||||
|
handle_call({read, Offset, Length}, _From,
|
||||||
|
State = #state{filename = F,
|
||||||
|
data_filehandle = FH,
|
||||||
|
unwritten_bytes = U,
|
||||||
|
reads = {T, Err}
|
||||||
|
}) ->
|
||||||
|
|
||||||
|
Checksum = get({Offset, Length}), %% N.B. Maybe be 'undefined'!
|
||||||
|
|
||||||
|
{Resp, NewErr} = case handle_read(FH, F, Checksum, Offset, Length, U) of
|
||||||
|
{ok, Bytes, Csum} ->
|
||||||
|
{{ok, Bytes, Csum}, Err};
|
||||||
|
eof ->
|
||||||
|
{{error, not_written}, Err + 1};
|
||||||
|
Error ->
|
||||||
|
{Error, Err + 1}
|
||||||
|
end,
|
||||||
|
{reply, Resp, State#state{reads = {T+1, NewErr}}};
|
||||||
|
|
||||||
|
%%% WRITES
|
||||||
|
|
||||||
|
handle_call({write, _Offset, _ClientMeta, _Data}, _From,
|
||||||
|
State = #state{wedged = true,
|
||||||
|
writes = {T, Err}
|
||||||
|
}) ->
|
||||||
|
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};
|
||||||
|
|
||||||
|
handle_call({write, Offset, ClientMeta, Data}, _From,
|
||||||
|
State = #state{unwritten_bytes = U,
|
||||||
|
filename = F,
|
||||||
|
writes = {T, Err},
|
||||||
|
data_filehandle = FHd,
|
||||||
|
csum_filehandle = FHc
|
||||||
|
}) ->
|
||||||
|
|
||||||
|
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE),
|
||||||
|
ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>),
|
||||||
|
|
||||||
|
{Resp, NewErr, NewU} =
|
||||||
|
case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of
|
||||||
|
{error, {bad_csum, Bad}} ->
|
||||||
|
lager:error("Bad checksum on write; client sent ~p, we computed ~p",
|
||||||
|
[ClientCsum, Bad]),
|
||||||
|
{{error, bad_csum}, Err + 1, U};
|
||||||
|
TaggedCsum ->
|
||||||
|
case handle_write(FHd, FHc, F, TaggedCsum, Offset, Data, U) of
|
||||||
|
{ok, NewU1} ->
|
||||||
|
{ok, Err, NewU1};
|
||||||
|
Error ->
|
||||||
|
{Error, Err + 1, U}
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
{NewEof, infinity} = lists:last(NewU),
|
||||||
|
{reply, Resp, State#state{writes = {T+1, NewErr},
|
||||||
|
eof_position = NewEof,
|
||||||
|
unwritten_bytes = NewU
|
||||||
|
}};
|
||||||
|
|
||||||
|
%% APPENDS
|
||||||
|
|
||||||
|
handle_call({append, _ClientMeta, _Extra, _Data}, _From,
|
||||||
|
State = #state{wedged = true,
|
||||||
|
appends = {T, Err}
|
||||||
|
}) ->
|
||||||
|
{reply, {error, wedged}, State#state{appends = {T+1, Err+1}}};
|
||||||
|
|
||||||
|
handle_call({append, ClientMeta, Extra, Data}, _From,
|
||||||
|
State = #state{eof_position = EofP,
|
||||||
|
unwritten_bytes = U,
|
||||||
|
filename = F,
|
||||||
|
appends = {T, Err},
|
||||||
|
data_filehandle = FHd,
|
||||||
|
csum_filehandle = FHc
|
||||||
|
}) ->
|
||||||
|
|
||||||
|
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE),
|
||||||
|
ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>),
|
||||||
|
|
||||||
|
{Resp, NewErr, NewU} =
|
||||||
|
case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of
|
||||||
|
{error, {bad_csum, Bad}} ->
|
||||||
|
lager:error("Bad checksum; client sent ~p, we computed ~p",
|
||||||
|
[ClientCsum, Bad]),
|
||||||
|
{{error, bad_csum}, Err + 1, U};
|
||||||
|
TaggedCsum ->
|
||||||
|
case handle_write(FHd, FHc, F, TaggedCsum, EofP, Data, U) of
|
||||||
|
{ok, NewU1} ->
|
||||||
|
{{ok, F, EofP}, Err, NewU1};
|
||||||
|
Error ->
|
||||||
|
{Error, Err + 1, EofP, U}
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
{NewEof, infinity} = lists:last(NewU),
|
||||||
|
{reply, Resp, State#state{appends = {T+1, NewErr},
|
||||||
|
eof_position = NewEof + Extra,
|
||||||
|
unwritten_bytes = NewU
|
||||||
|
}};
|
||||||
|
|
||||||
|
handle_call(Req, _From, State) ->
|
||||||
|
lager:warning("Unknown call: ~p", [Req]),
|
||||||
|
{reply, whoaaaaaaaaaaaa, State}.
|
||||||
|
|
||||||
|
% @private
|
||||||
|
handle_cast(Cast, State) ->
|
||||||
|
lager:warning("Unknown cast: ~p", [Cast]),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
% @private
|
||||||
|
handle_info(tick, State = #state{eof_position = Eof}) when Eof >= ?MAX_FILE_SIZE ->
|
||||||
|
lager:notice("Eof position ~p >= max file size ~p. Shutting down.",
|
||||||
|
[Eof, ?MAX_FILE_SIZE]),
|
||||||
|
{stop, file_rollover, State};
|
||||||
|
|
||||||
|
%% XXX Is this a good idea? Need to think this through a bit.
|
||||||
|
handle_info(tick, State = #state{wedged = true}) ->
|
||||||
|
{stop, wedged, State};
|
||||||
|
|
||||||
|
%% I dunno. This may not be a good idea, but it seems like if we're throwing lots of
|
||||||
|
%% errors, we ought to shut down and give up our file descriptors.
|
||||||
|
handle_info(tick, State = #state{
|
||||||
|
ops = Ops,
|
||||||
|
reads = {RT, RE},
|
||||||
|
writes = {WT, WE},
|
||||||
|
appends = {AT, AE}
|
||||||
|
}) when Ops > 100 andalso
|
||||||
|
trunc(((RE+WE+AE) / RT+WT+AT) * 100) > ?TOO_MANY_ERRORS_RATIO ->
|
||||||
|
Errors = RE + WE + AE,
|
||||||
|
lager:notice("Got ~p errors. Shutting down.", [Errors]),
|
||||||
|
{stop, too_many_errors, State};
|
||||||
|
|
||||||
|
handle_info(tick, State = #state{
|
||||||
|
ticks = Ticks,
|
||||||
|
ops = Ops,
|
||||||
|
reads = {RT, _RE},
|
||||||
|
writes = {WT, _WE},
|
||||||
|
appends = {AT, _AE}}) when Ops == RT + WT + AT, Ticks == ?TICK_THRESHOLD ->
|
||||||
|
lager:debug("Got 5 ticks with no new activity. Shutting down."),
|
||||||
|
{stop, normal, State};
|
||||||
|
|
||||||
|
handle_info(tick, State = #state{
|
||||||
|
ticks = Ticks,
|
||||||
|
ops = Ops,
|
||||||
|
reads = {RT, _RE},
|
||||||
|
writes = {WT, _WE},
|
||||||
|
appends = {AT, _AE}}) when Ops == RT + WT + AT ->
|
||||||
|
lager:debug("No new activity since last tick. Incrementing tick counter."),
|
||||||
|
Tref = schedule_tick(),
|
||||||
|
{noreply, State#state{tref = Tref, ticks = Ticks + 1}};
|
||||||
|
|
||||||
|
handle_info(tick, State = #state{
|
||||||
|
reads = {RT, _RE},
|
||||||
|
writes = {WT, _WE},
|
||||||
|
appends = {AT, _AE}
|
||||||
|
}) ->
|
||||||
|
Ops = RT + WT + AT,
|
||||||
|
lager:debug("Setting ops counter to ~p", [Ops]),
|
||||||
|
Tref = schedule_tick(),
|
||||||
|
{noreply, State#state{tref = Tref, ops = Ops}};
|
||||||
|
|
||||||
|
%handle_info({wedged, EpochId} State = #state{epoch = E}) when E /= EpochId ->
|
||||||
|
% lager:notice("Wedge epoch ~p but ignoring because our epoch id is ~p", [EpochId, E]),
|
||||||
|
% {noreply, State};
|
||||||
|
|
||||||
|
%handle_info({wedged, EpochId}, State = #state{epoch = E}) when E == EpochId ->
|
||||||
|
% lager:notice("Wedge epoch ~p same as our epoch id ~p; we are wedged. Bummer.", [EpochId, E]),
|
||||||
|
% {noreply, State#state{wedged = true}};
|
||||||
|
|
||||||
|
% flu1.erl:
|
||||||
|
% ProxyPid = get_proxy_pid(Filename),
|
||||||
|
% Are we wedged? if not
|
||||||
|
% machi_file_proxy:read(Pid, Offset, Length)
|
||||||
|
% otherwise -> error,wedged
|
||||||
|
%
|
||||||
|
% get_proxy_pid(Filename) ->
|
||||||
|
% Pid = lookup_pid(Filename)
|
||||||
|
% is_pid_alive(Pid)
|
||||||
|
% Pid
|
||||||
|
% if not alive then start one
|
||||||
|
|
||||||
|
handle_info(Req, State) ->
|
||||||
|
lager:warning("Unknown info message: ~p", [Req]),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
% @private
|
||||||
|
terminate(Reason, #state{filename = F,
|
||||||
|
data_filehandle = FHd,
|
||||||
|
csum_filehandle = FHc,
|
||||||
|
reads = {RT, RE},
|
||||||
|
writes = {WT, WE},
|
||||||
|
appends = {AT, AE}
|
||||||
|
}) ->
|
||||||
|
lager:info("Shutting down proxy for file ~p because ~p", [F, Reason]),
|
||||||
|
lager:info(" Op Tot/Error", []),
|
||||||
|
lager:info(" Reads: ~p/~p", [RT, RE]),
|
||||||
|
lager:info(" Writes: ~p/~p", [WT, WE]),
|
||||||
|
lager:info("Appends: ~p/~p", [AT, AE]),
|
||||||
|
ok = file:sync(FHd),
|
||||||
|
ok = file:sync(FHc),
|
||||||
|
ok = file:close(FHd),
|
||||||
|
ok = file:close(FHc),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
% @private
|
||||||
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
%% Private functions
|
||||||
|
|
||||||
|
-spec schedule_tick() -> reference().
|
||||||
|
schedule_tick() ->
|
||||||
|
erlang:send_after(?TICK, self(), tick).
|
||||||
|
|
||||||
|
-spec check_or_make_tagged_csum(Type :: binary(),
|
||||||
|
Checksum :: binary(),
|
||||||
|
Data :: binary() ) -> binary() |
|
||||||
|
{error, {bad_csum, Bad :: binary()}}.
|
||||||
|
check_or_make_tagged_csum(?CSUM_TAG_NONE, _Csum, Data) ->
|
||||||
|
%% We are making a checksum here
|
||||||
|
Csum = machi_util:checksum_chunk(Data),
|
||||||
|
machi_util:make_tagged_csum(server_sha, Csum);
|
||||||
|
check_or_make_tagged_csum(Tag, InCsum, Data) when Tag == ?CSUM_TAG_CLIENT_SHA;
|
||||||
|
Tag == ?CSUM_TAG_SERVER_SHA ->
|
||||||
|
Csum = machi_util:checksum_chunk(Data),
|
||||||
|
case Csum =:= InCsum of
|
||||||
|
true ->
|
||||||
|
machi_util:make_tagged_csum(server_sha, Csum);
|
||||||
|
false ->
|
||||||
|
{error, {bad_csum, Csum}}
|
||||||
|
end;
|
||||||
|
check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
|
||||||
|
lager:warning("Unknown checksum tag ~p", [OtherTag]),
|
||||||
|
{error, bad_csum}.
|
||||||
|
|
||||||
|
encode_csum_file_entry(Offset, Size, TaggedCSum) ->
|
||||||
|
Len = 8 + 4 + byte_size(TaggedCSum),
|
||||||
|
[<<Len:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big>>,
|
||||||
|
TaggedCSum].
|
||||||
|
|
||||||
|
map_offsets_to_csums(CsumList) ->
|
||||||
|
lists:foreach(fun insert_offsets/1, CsumList).
|
||||||
|
|
||||||
|
insert_offsets({Offset, Length, Checksum}) ->
|
||||||
|
put({Offset, Length}, Checksum).
|
||||||
|
|
||||||
|
-spec parse_csum_file( Filename :: string() ) -> [byte_sequence()].
|
||||||
|
parse_csum_file(Filename) ->
|
||||||
|
%% using file:read_file works as long as the files are "small"
|
||||||
|
try
|
||||||
|
{ok, CsumData} = file:read_file(Filename),
|
||||||
|
{DecodedCsums, _Junk} = machi_flu1:split_checksum_list_blob_decode(CsumData),
|
||||||
|
Sort = lists:sort(DecodedCsums),
|
||||||
|
case Sort of
|
||||||
|
[] -> [{?MINIMUM_OFFSET, infinity}];
|
||||||
|
_ ->
|
||||||
|
map_offsets_to_csums(DecodedCsums),
|
||||||
|
{First, _, _} = hd(Sort),
|
||||||
|
build_unwritten_bytes_list(Sort, First, [])
|
||||||
|
end
|
||||||
|
catch
|
||||||
|
_:{badmatch, {error, enoent}} ->
|
||||||
|
[{?MINIMUM_OFFSET, infinity}]
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec handle_read(FHd :: file:filehandle(),
|
||||||
|
Filename :: string(),
|
||||||
|
TaggedCsum :: undefined|binary(),
|
||||||
|
Offset :: non_neg_integer(),
|
||||||
|
Size :: non_neg_integer(),
|
||||||
|
Unwritten :: [byte_sequence()]
|
||||||
|
) -> {ok, Bytes :: binary(), Csum :: binary()} |
|
||||||
|
eof |
|
||||||
|
{error, bad_csum} |
|
||||||
|
{error, partial_read} |
|
||||||
|
{error, not_written} |
|
||||||
|
{error, Other :: term() }.
|
||||||
|
% @private Attempt a read operation on the given offset and length.
|
||||||
|
% <li>
|
||||||
|
% <ul> If the byte range is not yet written, `{error, not_written}' is
|
||||||
|
% returned.</ul>
|
||||||
|
% <ul> If the checksum given does not match what comes off the disk,
|
||||||
|
% `{error, bad_csum}' is returned.</ul>
|
||||||
|
% <ul> If the number of bytes that comes off the disk is not the requested length,
|
||||||
|
% `{error, partial_read}' is returned.</ul>
|
||||||
|
% <ul> If the offset is at or beyond the current file boundary, `eof' is returned.</ul>
|
||||||
|
% <ul> If some kind of POSIX error occurs, the OTP version of that POSIX error
|
||||||
|
% tuple is returned.</ul>
|
||||||
|
% </li>
|
||||||
|
%
|
||||||
|
% On success, `{ok, Bytes, Checksum}' is returned.
|
||||||
|
handle_read(FHd, Filename, undefined, Offset, Size, U) ->
|
||||||
|
handle_read(FHd, Filename, machi_util:make_tagged_csum(none), Offset, Size, U);
|
||||||
|
|
||||||
|
handle_read(FHd, Filename, TaggedCsum, Offset, Size, U) ->
|
||||||
|
case is_byte_range_unwritten(Offset, Size, U) of
|
||||||
|
true ->
|
||||||
|
{error, not_written};
|
||||||
|
false ->
|
||||||
|
do_read(FHd, Filename, TaggedCsum, Offset, Size)
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
|
||||||
|
case file:pread(FHd, Offset, Size) of
|
||||||
|
eof ->
|
||||||
|
eof;
|
||||||
|
{ok, Bytes} when byte_size(Bytes) == Size ->
|
||||||
|
{Tag, Ck} = machi_util:unmake_tagged_csum(TaggedCsum),
|
||||||
|
case check_or_make_tagged_csum(Tag, Ck, Bytes) of
|
||||||
|
{error, Bad} ->
|
||||||
|
lager:error("Bad checksum; got ~p, expected ~p",
|
||||||
|
[Bad, Ck]),
|
||||||
|
{error, bad_csum};
|
||||||
|
TaggedCsum ->
|
||||||
|
{ok, Bytes, TaggedCsum};
|
||||||
|
%% XXX FIXME: Should we return something other than
|
||||||
|
%% {ok, ....} in this case?
|
||||||
|
OtherCsum when Tag =:= ?CSUM_TAG_NONE ->
|
||||||
|
{ok, Bytes, OtherCsum}
|
||||||
|
end;
|
||||||
|
{ok, Partial} ->
|
||||||
|
lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p",
|
||||||
|
[Filename, Offset, Size, byte_size(Partial)]),
|
||||||
|
{error, partial_read};
|
||||||
|
Other ->
|
||||||
|
lager:error("While reading file ~p, offset ~p, length ~p, got ~p",
|
||||||
|
[Filename, Offset, Size, Other]),
|
||||||
|
{error, Other}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec handle_write( FHd :: file:filehandle(),
|
||||||
|
FHc :: file:filehandle(),
|
||||||
|
Filename :: string(),
|
||||||
|
TaggedCsum :: binary(),
|
||||||
|
Offset :: non_neg_integer(),
|
||||||
|
Data :: binary(),
|
||||||
|
Unwritten :: [byte_sequence()]
|
||||||
|
) -> {ok, NewU :: [byte_sequence()]} |
|
||||||
|
{error, written} |
|
||||||
|
{error, Reason :: term()}.
|
||||||
|
% @private Implements the write and append operation. The first task is to
|
||||||
|
% determine if the offset and data size has been written. If not, the write
|
||||||
|
% is allowed proceed. A special case is made when an offset and data size
|
||||||
|
% match a checksum. In that case we read the data off the disk, validate the
|
||||||
|
% checksum and return a "fake" ok response as if the write had been performed
|
||||||
|
% when it hasn't really.
|
||||||
|
%
|
||||||
|
% If a write proceeds, the offset, size and checksum are written to a metadata
|
||||||
|
% file, and the internal list of unwritten bytes is modified to reflect the
|
||||||
|
% just-performed write. This is then returned to the caller as
|
||||||
|
% `{ok, NewUnwritten}' where NewUnwritten is the revised unwritten byte list.
|
||||||
|
handle_write(FHd, FHc, Filename, TaggedCsum, Offset, Data, U) ->
|
||||||
|
Size = iolist_size(Data),
|
||||||
|
|
||||||
|
case is_byte_range_unwritten(Offset, Size, U) of
|
||||||
|
false ->
|
||||||
|
case get({Offset, Size}) of
|
||||||
|
undefined ->
|
||||||
|
{error, written};
|
||||||
|
TaggedCsum ->
|
||||||
|
case do_read(FHd, Filename, TaggedCsum, Offset, Size) of
|
||||||
|
eof ->
|
||||||
|
lager:warning("This should never happen: got eof while reading at offset ~p in file ~p that's supposedly written",
|
||||||
|
[Offset, Filename]),
|
||||||
|
{error, server_insanity};
|
||||||
|
{ok, _, _} ->
|
||||||
|
{ok, U};
|
||||||
|
_ ->
|
||||||
|
{error, written}
|
||||||
|
end;
|
||||||
|
OtherCsum ->
|
||||||
|
%% Got a checksum, but it doesn't match the data block's
|
||||||
|
lager:error("During a potential write at offset ~p in file ~p, a check for unwritten bytes gave us checksum ~p but the data we were trying to trying to write has checksum ~p",
|
||||||
|
[Offset, Filename, OtherCsum, TaggedCsum]),
|
||||||
|
{error, written}
|
||||||
|
end;
|
||||||
|
true ->
|
||||||
|
try
|
||||||
|
do_write(FHd, FHc, Filename, TaggedCsum, Offset, Size, Data, U)
|
||||||
|
catch
|
||||||
|
%%% XXX FIXME: be more specific on badmatch that might
|
||||||
|
%%% occur around line 593 when we write the checksum
|
||||||
|
%%% file entry for the data blob we just put on the disk
|
||||||
|
error:Reason ->
|
||||||
|
{error, Reason}
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
% @private Implements the disk writes for both the write and append
|
||||||
|
% operation.
|
||||||
|
-spec do_write( FHd :: file:descriptor(),
|
||||||
|
FHc :: file:descriptor(),
|
||||||
|
Filename :: string(),
|
||||||
|
TaggedCsum :: binary(),
|
||||||
|
Offset :: non_neg_integer(),
|
||||||
|
Size :: non_neg_integer(),
|
||||||
|
Data :: binary(),
|
||||||
|
Unwritten :: [byte_sequence()]
|
||||||
|
) -> {ok, NewUnwritten :: [byte_sequence()]} |
|
||||||
|
{error, Reason :: term()}.
|
||||||
|
do_write(FHd, FHc, Filename, TaggedCsum, Offset, Size, Data, U) ->
|
||||||
|
case file:pwrite(FHd, Offset, Data) of
|
||||||
|
ok ->
|
||||||
|
lager:debug("Successful write in file ~p at offset ~p, length ~p",
|
||||||
|
[Filename, Offset, Size]),
|
||||||
|
EncodedCsum = encode_csum_file_entry(Offset, Size, TaggedCsum),
|
||||||
|
ok = file:write(FHc, EncodedCsum),
|
||||||
|
put({Offset, Size}, TaggedCsum),
|
||||||
|
NewU = update_unwritten(Offset, Size, U),
|
||||||
|
lager:debug("Successful write to checksum file for ~p; unwritten bytes are now: ~p",
|
||||||
|
[Filename, NewU]),
|
||||||
|
{ok, NewU};
|
||||||
|
Other ->
|
||||||
|
lager:error("Got ~p during write to file ~p at offset ~p, length ~p",
|
||||||
|
[Other, Filename, Offset, Size]),
|
||||||
|
{error, Other}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec is_byte_range_unwritten( Offset :: non_neg_integer(),
|
||||||
|
Size :: pos_integer(),
|
||||||
|
Unwritten :: [byte_sequence()] ) -> boolean().
|
||||||
|
% @private Given an offset and a size, return `true' if a byte range has
|
||||||
|
% <b>not</b> been written. Otherwise, return `false'.
|
||||||
|
is_byte_range_unwritten(Offset, Size, Unwritten) ->
|
||||||
|
case length(Unwritten) of
|
||||||
|
0 ->
|
||||||
|
lager:critical("Unwritten byte list has 0 entries! This should never happen."),
|
||||||
|
false;
|
||||||
|
1 ->
|
||||||
|
{Eof, infinity} = hd(Unwritten),
|
||||||
|
Offset >= Eof;
|
||||||
|
_ ->
|
||||||
|
case lookup_unwritten(Offset, Size, Unwritten) of
|
||||||
|
{ok, _} -> true;
|
||||||
|
not_found -> false
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec lookup_unwritten( Offset :: non_neg_integer(),
|
||||||
|
Size :: pos_integer(),
|
||||||
|
Unwritten :: [byte_sequence()]
|
||||||
|
) -> {ok, byte_sequence()} | not_found.
|
||||||
|
% @private Given an offset and a size, scan the list of unwritten bytes and
|
||||||
|
% look for a "hole" where a write might be allowed if any exist. If a
|
||||||
|
% suitable byte sequence is found, the function returns a tuple of {ok,
|
||||||
|
% {Position, Space}} is returned. `not_found' is returned if no suitable
|
||||||
|
% space is located.
|
||||||
|
lookup_unwritten(_Offset, _Size, []) ->
|
||||||
|
not_found;
|
||||||
|
lookup_unwritten(Offset, _Size, [H={Pos, infinity}|_Rest]) when Offset >= Pos ->
|
||||||
|
{ok, H};
|
||||||
|
lookup_unwritten(Offset, Size, [H={Pos, Space}|_Rest])
|
||||||
|
when Offset >= Pos andalso Offset < Pos+Space
|
||||||
|
andalso Size =< (Space - (Offset - Pos)) ->
|
||||||
|
{ok, H};
|
||||||
|
lookup_unwritten(Offset, Size, [_H|Rest]) ->
|
||||||
|
%% These are not the droids you're looking for.
|
||||||
|
lookup_unwritten(Offset, Size, Rest).
|
||||||
|
|
||||||
|
%%% if the pos is greater than offset + size then we're done. End early.
|
||||||
|
|
||||||
|
-spec update_unwritten( Offset :: non_neg_integer(),
|
||||||
|
Size :: pos_integer(),
|
||||||
|
Unwritten :: [byte_sequence()] ) -> NewUnwritten :: [byte_sequence()].
|
||||||
|
% @private Given an offset, a size and the unwritten byte list, return an updated
|
||||||
|
% and sorted unwritten byte list accounting for any completed write operation.
|
||||||
|
update_unwritten(Offset, Size, Unwritten) ->
|
||||||
|
case lookup_unwritten(Offset, Size, Unwritten) of
|
||||||
|
not_found ->
|
||||||
|
lager:error("Couldn't find byte sequence tuple for a write which earlier found a valid spot to write!!! This should never happen!"),
|
||||||
|
Unwritten;
|
||||||
|
{ok, {Offset, Size}} ->
|
||||||
|
%% we neatly filled in our hole...
|
||||||
|
lists:keydelete(Offset, 1, Unwritten);
|
||||||
|
{ok, S={Pos, _}} ->
|
||||||
|
lists:sort(lists:keydelete(Pos, 1, Unwritten) ++
|
||||||
|
update_byte_range(Offset, Size, S))
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec update_byte_range( Offset :: non_neg_integer(),
|
||||||
|
Size :: pos_integer(),
|
||||||
|
Sequence :: byte_sequence() ) -> Updates :: [byte_sequence()].
|
||||||
|
% @private Given an offset and size and a byte sequence tuple where a
|
||||||
|
% write took place, return a list of updates to the list of unwritten bytes
|
||||||
|
% accounting for the space occupied by the just completed write.
|
||||||
|
update_byte_range(Offset, Size, {Eof, infinity}) when Offset == Eof ->
|
||||||
|
[{Offset + Size, infinity}];
|
||||||
|
update_byte_range(Offset, Size, {Eof, infinity}) when Offset > Eof ->
|
||||||
|
[{Eof, (Offset - Eof)}, {Offset+Size, infinity}];
|
||||||
|
update_byte_range(Offset, Size, {Pos, Space}) when Offset == Pos andalso Size < Space ->
|
||||||
|
[{Offset + Size, Space - Size}];
|
||||||
|
update_byte_range(Offset, Size, {Pos, Space}) when Offset > Pos ->
|
||||||
|
[{Pos, Offset - Pos}, {Offset+Size, ( (Pos+Space) - (Offset + Size) )}].
|
||||||
|
|
||||||
|
|
||||||
|
-spec build_unwritten_bytes_list( CsumData :: [{ Offset :: non_neg_integer(),
|
||||||
|
Size :: pos_integer(),
|
||||||
|
Checksum :: binary() }],
|
||||||
|
LastOffset :: non_neg_integer(),
|
||||||
|
Acc :: list() ) -> [byte_sequence()].
|
||||||
|
% @private Given a <b>sorted</b> list of checksum data tuples, return a sorted
|
||||||
|
% list of unwritten byte ranges. The output list <b>always</b> has at least one
|
||||||
|
% entry: the last tuple in the list is guaranteed to be the current end of
|
||||||
|
% bytes written to a particular file with the special space moniker
|
||||||
|
% `infinity'.
|
||||||
|
build_unwritten_bytes_list([], Last, Acc) ->
|
||||||
|
NewAcc = [ {Last, infinity} | Acc ],
|
||||||
|
lists:reverse(NewAcc);
|
||||||
|
build_unwritten_bytes_list([{CurrentOffset, CurrentSize, _Csum}|Rest], LastOffset, Acc) when
|
||||||
|
CurrentOffset /= LastOffset ->
|
||||||
|
Hole = CurrentOffset - LastOffset,
|
||||||
|
build_unwritten_bytes_list(Rest, (CurrentOffset+CurrentSize), [{LastOffset, Hole}|Acc]);
|
||||||
|
build_unwritten_bytes_list([{CO, CS, _Ck}|Rest], _LastOffset, Acc) ->
|
||||||
|
build_unwritten_bytes_list(Rest, CO + CS, Acc).
|
46
src/machi_file_proxy_sup.erl
Normal file
46
src/machi_file_proxy_sup.erl
Normal file
|
@ -0,0 +1,46 @@
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%%
|
||||||
|
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% This file is provided to you under the Apache License,
|
||||||
|
%% Version 2.0 (the "License"); you may not use this file
|
||||||
|
%% except in compliance with the License. You may obtain
|
||||||
|
%% a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing,
|
||||||
|
%% software distributed under the License is distributed on an
|
||||||
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
%% KIND, either express or implied. See the License for the
|
||||||
|
%% specific language governing permissions and limitations
|
||||||
|
%% under the License.
|
||||||
|
%%
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc This is the main supervisor for the file proxies.
|
||||||
|
-module(machi_file_proxy_sup).
|
||||||
|
-behaviour(supervisor).
|
||||||
|
|
||||||
|
%% public API
|
||||||
|
-export([
|
||||||
|
start_link/0,
|
||||||
|
start_proxy/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% supervisor callback
|
||||||
|
-export([
|
||||||
|
init/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
start_link() ->
|
||||||
|
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||||
|
|
||||||
|
start_proxy(Filename, DataDir) ->
|
||||||
|
supervisor:start_child(?MODULE, [Filename, DataDir]).
|
||||||
|
|
||||||
|
init([]) ->
|
||||||
|
SupFlags = {simple_one_for_one, 1000, 10},
|
||||||
|
ChildSpec = {unused, {machi_file_proxy, start_link, []},
|
||||||
|
temporary, 2000, worker, [machi_file_proxy]},
|
||||||
|
{ok, {SupFlags, [ChildSpec]}}.
|
1731
src/machi_flu1.erl
1731
src/machi_flu1.erl
File diff suppressed because it is too large
Load diff
167
src/machi_flu_filename_mgr.erl
Normal file
167
src/machi_flu_filename_mgr.erl
Normal file
|
@ -0,0 +1,167 @@
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%%
|
||||||
|
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% This file is provided to you under the Apache License,
|
||||||
|
%% Version 2.0 (the "License"); you may not use this file
|
||||||
|
%% except in compliance with the License. You may obtain
|
||||||
|
%% a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing,
|
||||||
|
%% software distributed under the License is distributed on an
|
||||||
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
%% KIND, either express or implied. See the License for the
|
||||||
|
%% specific language governing permissions and limitations
|
||||||
|
%% under the License.
|
||||||
|
%%
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%%
|
||||||
|
%% @doc This process is responsible for managing filenames assigned to
|
||||||
|
%% prefixes.
|
||||||
|
%%
|
||||||
|
%% Supported operations include finding the "current" filename assigned to
|
||||||
|
%% a prefix. Incrementing the sequence number and returning a new file name
|
||||||
|
%% and listing all data files assigned to a given prefix.
|
||||||
|
%%
|
||||||
|
%% All prefixes should have the form of `{prefix, P}'. Single filename
|
||||||
|
%% return values have the form of `{file, F}'.
|
||||||
|
%%
|
||||||
|
%% <h2>Finding the current file associated with a sequence</h2>
|
||||||
|
%% First it looks up the sequence number from the prefix name. If
|
||||||
|
%% no sequence file is found, it uses 0 as the sequence number and searches
|
||||||
|
%% for a matching file with the prefix and 0 as the sequence number.
|
||||||
|
%% If no file is found, the it generates a new filename by incorporating
|
||||||
|
%% the given prefix, a randomly generated (v4) UUID and 0 as the
|
||||||
|
%% sequence number.
|
||||||
|
%%
|
||||||
|
%% If the sequence number is > 0, then the process scans the filesystem
|
||||||
|
%% looking for a filename which matches the prefix and given sequence number and
|
||||||
|
%% returns that.
|
||||||
|
|
||||||
|
-module(machi_flu_filename_mgr).
|
||||||
|
-behavior(gen_server).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
start_link/1,
|
||||||
|
find_or_make_filename_from_prefix/1,
|
||||||
|
increment_prefix_sequence/1,
|
||||||
|
list_files_by_prefix/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% gen_server callbacks
|
||||||
|
-export([
|
||||||
|
init/1,
|
||||||
|
handle_cast/2,
|
||||||
|
handle_call/3,
|
||||||
|
handle_info/2,
|
||||||
|
terminate/2,
|
||||||
|
code_change/3
|
||||||
|
]).
|
||||||
|
|
||||||
|
-define(TIMEOUT, 10 * 1000).
|
||||||
|
|
||||||
|
%% public API
|
||||||
|
start_link(DataDir) when is_list(DataDir) ->
|
||||||
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [DataDir], []).
|
||||||
|
|
||||||
|
-spec find_or_make_filename_from_prefix( Prefix :: {prefix, string()} ) ->
|
||||||
|
{file, Filename :: string()} | {error, Reason :: term() } | timeout.
|
||||||
|
% @doc Find the latest available or make a filename from a prefix. A prefix
|
||||||
|
% should be in the form of a tagged tuple `{prefix, P}'. Returns a tagged
|
||||||
|
% tuple in the form of `{file, F}' or an `{error, Reason}'
|
||||||
|
find_or_make_filename_from_prefix({prefix, Prefix}) ->
|
||||||
|
gen_server:call(?MODULE, {find_filename, Prefix}, ?TIMEOUT);
|
||||||
|
find_or_make_filename_from_prefix(Other) ->
|
||||||
|
lager:error("~p is not a valid prefix.", [Other]),
|
||||||
|
error(badarg).
|
||||||
|
|
||||||
|
-spec increment_prefix_sequence( Prefix :: {prefix, string()} ) ->
|
||||||
|
ok | {error, Reason :: term() } | timeout.
|
||||||
|
% @doc Increment the sequence counter for a given prefix. Prefix should
|
||||||
|
% be in the form of `{prefix, P}'.
|
||||||
|
increment_prefix_sequence({prefix, Prefix}) ->
|
||||||
|
gen_server:call(?MODULE, {increment_sequence, Prefix}, ?TIMEOUT);
|
||||||
|
increment_prefix_sequence(Other) ->
|
||||||
|
lager:error("~p is not a valid prefix.", [Other]),
|
||||||
|
error(badarg).
|
||||||
|
|
||||||
|
-spec list_files_by_prefix( Prefix :: {prefix, string()} ) ->
|
||||||
|
[ file:name() ] | timeout | {error, Reason :: term() }.
|
||||||
|
% @doc Given a prefix in the form of `{prefix, P}' return
|
||||||
|
% all the data files associated with that prefix. Returns
|
||||||
|
% a list.
|
||||||
|
list_files_by_prefix({prefix, Prefix}) ->
|
||||||
|
gen_server:call(?MODULE, {list_files, Prefix}, ?TIMEOUT);
|
||||||
|
list_files_by_prefix(Other) ->
|
||||||
|
lager:error("~p is not a valid prefix.", [Other]),
|
||||||
|
error(badarg).
|
||||||
|
|
||||||
|
%% gen_server API
|
||||||
|
init([DataDir]) ->
|
||||||
|
{ok, DataDir}.
|
||||||
|
|
||||||
|
handle_cast(Req, State) ->
|
||||||
|
lager:warning("Got unknown cast ~p", [Req]),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
handle_call({find_filename, Prefix}, _From, DataDir) ->
|
||||||
|
N = machi_util:read_max_filenum(DataDir, Prefix),
|
||||||
|
File = case find_file(DataDir, Prefix, N) of
|
||||||
|
[] ->
|
||||||
|
{F, _} = machi_util:make_data_filename(
|
||||||
|
DataDir,
|
||||||
|
Prefix,
|
||||||
|
generate_uuid_v4_str(),
|
||||||
|
N),
|
||||||
|
F;
|
||||||
|
[H] -> H;
|
||||||
|
[Fn | _ ] = L ->
|
||||||
|
lager:warning(
|
||||||
|
"Searching for a matching file to prefix ~p and sequence number ~p gave multiples: ~p",
|
||||||
|
[Prefix, N, L]),
|
||||||
|
Fn
|
||||||
|
end,
|
||||||
|
{reply, {file, File}, DataDir};
|
||||||
|
handle_call({increment_sequence, Prefix}, _From, DataDir) ->
|
||||||
|
ok = machi_util:increment_max_filenum(DataDir, Prefix),
|
||||||
|
{reply, ok, DataDir};
|
||||||
|
handle_call({list_files, Prefix}, From, DataDir) ->
|
||||||
|
spawn(fun() ->
|
||||||
|
L = list_files(DataDir, Prefix),
|
||||||
|
gen_server:reply(From, L)
|
||||||
|
end),
|
||||||
|
{noreply, DataDir};
|
||||||
|
|
||||||
|
handle_call(Req, From, State) ->
|
||||||
|
lager:warning("Got unknown call ~p from ~p", [Req, From]),
|
||||||
|
{reply, hoge, State}.
|
||||||
|
|
||||||
|
handle_info(Info, State) ->
|
||||||
|
lager:warning("Got unknown info ~p", [Info]),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
terminate(Reason, _State) ->
|
||||||
|
lager:info("Shutting down because ~p", [Reason]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
%% private
|
||||||
|
|
||||||
|
%% Quoted from https://github.com/afiskon/erlang-uuid-v4/blob/master/src/uuid.erl
|
||||||
|
%% MIT License
|
||||||
|
generate_uuid_v4_str() ->
|
||||||
|
<<A:32, B:16, C:16, D:16, E:48>> = crypto:strong_rand_bytes(16),
|
||||||
|
io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b",
|
||||||
|
[A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]).
|
||||||
|
|
||||||
|
find_file(DataDir, Prefix, N) ->
|
||||||
|
{_Filename, Path} = machi_util:make_data_filename(DataDir, Prefix, "*", N),
|
||||||
|
filelib:wildcard(Path).
|
||||||
|
|
||||||
|
list_files(DataDir, Prefix) ->
|
||||||
|
{F, Path} = machi_util:make_data_filename(DataDir, Prefix, "*", "*"),
|
||||||
|
filelib:wildcard(F, filename:dirname(Path)).
|
190
src/machi_flu_listener.erl
Normal file
190
src/machi_flu_listener.erl
Normal file
|
@ -0,0 +1,190 @@
|
||||||
|
% 1. start file proxy supervisor
|
||||||
|
% 2. start projection store
|
||||||
|
% 3. start listener
|
||||||
|
-module(machi_flu_listener).
|
||||||
|
-behaviour(ranch_protocol).
|
||||||
|
|
||||||
|
-export([start_link/4]).
|
||||||
|
-export([init/4]).
|
||||||
|
|
||||||
|
-include("machi.hrl").
|
||||||
|
-include("machi_pb.hrl").
|
||||||
|
-include("machi_projection.hrl").
|
||||||
|
|
||||||
|
-record(state, {
|
||||||
|
pb_mode,
|
||||||
|
high_clnt,
|
||||||
|
proj_store,
|
||||||
|
etstab,
|
||||||
|
epoch_id,
|
||||||
|
flu_name
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(SERVER_CMD_READ_TIMEOUT, 600 * 1000).
|
||||||
|
|
||||||
|
start_link(Ref, Socket, Transport, Opts) ->
|
||||||
|
Pid = spawn_link(?MODULE, init, [Ref, Socket, Transport, Opts]),
|
||||||
|
{ok, Pid}.
|
||||||
|
|
||||||
|
init(Ref, Socket, Transport, _Opts = []) ->
|
||||||
|
ok = ranch:accept_ack(Ref),
|
||||||
|
%% By default, ranch sets sockets to
|
||||||
|
%% {active, false}, {packet, raw}, {reuseaddr, true}
|
||||||
|
ok = Transport:setopts(Socket, ?PB_PACKET_OPTS),
|
||||||
|
loop(Socket, Transport, #state{}).
|
||||||
|
|
||||||
|
loop(Socket, Transport, S) ->
|
||||||
|
case Transport:recv(Socket, 0, ?SERVER_CMD_READ_TIMEOUT) of
|
||||||
|
{ok, Bin} ->
|
||||||
|
{RespBin, S2} =
|
||||||
|
case machi_pb:decode_mpb_ll_request(Bin) of
|
||||||
|
LL_req when LL_req#mpb_ll_request.do_not_alter == 2 ->
|
||||||
|
{R, NewS} = do_pb_ll_request(LL_req, S),
|
||||||
|
{maybe_encode_response(R), mode(low, NewS)};
|
||||||
|
_ ->
|
||||||
|
HL_req = machi_pb:decode_mpb_request(Bin),
|
||||||
|
1 = HL_req#mpb_request.do_not_alter,
|
||||||
|
{R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)),
|
||||||
|
{machi_pb:encode_mpb_response(R), mode(high, NewS)}
|
||||||
|
end,
|
||||||
|
if RespBin == async_no_response ->
|
||||||
|
ok;
|
||||||
|
true ->
|
||||||
|
ok = Transport:send(Socket, RespBin)
|
||||||
|
end,
|
||||||
|
loop(Socket, Transport, S2);
|
||||||
|
{error, SockError} ->
|
||||||
|
lager:error("Socket error ~w", [SockError]),
|
||||||
|
(catch Transport:close(Socket))
|
||||||
|
end.
|
||||||
|
|
||||||
|
make_high_clnt(#state{high_clnt=undefined}=S) ->
|
||||||
|
{ok, Proj} = machi_projection_store:read_latest_projection(
|
||||||
|
S#state.proj_store, private),
|
||||||
|
Ps = [P_srvr || {_, P_srvr} <- orddict:to_list(
|
||||||
|
Proj#projection_v1.members_dict)],
|
||||||
|
{ok, Clnt} = machi_cr_client:start_link(Ps),
|
||||||
|
S#state{high_clnt=Clnt};
|
||||||
|
make_high_clnt(S) ->
|
||||||
|
S.
|
||||||
|
|
||||||
|
maybe_encode_response(async_no_response=X) ->
|
||||||
|
X;
|
||||||
|
maybe_encode_response(R) ->
|
||||||
|
machi_pb:encode_mpb_ll_response(R).
|
||||||
|
|
||||||
|
mode(Mode, #state{pb_mode=undefined}=S) ->
|
||||||
|
S#state{pb_mode=Mode};
|
||||||
|
mode(_, S) ->
|
||||||
|
S.
|
||||||
|
|
||||||
|
do_pb_ll_request(#mpb_ll_request{req_id=ReqID}, #state{pb_mode=high}=S) ->
|
||||||
|
Result = {high_error, 41, "Low protocol request while in high mode"},
|
||||||
|
{machi_pb_translate:to_pb_response(ReqID, unused, Result), S};
|
||||||
|
do_pb_ll_request(PB_request, S) ->
|
||||||
|
Req = machi_pb_translate:from_pb_request(PB_request),
|
||||||
|
{ReqID, Cmd, Result, S2} =
|
||||||
|
case Req of
|
||||||
|
{RqID, {LowCmd, _}=CMD}
|
||||||
|
when LowCmd == low_proj;
|
||||||
|
LowCmd == low_wedge_status; LowCmd == low_list_files ->
|
||||||
|
%% Skip wedge check for projection commands!
|
||||||
|
%% Skip wedge check for these unprivileged commands
|
||||||
|
{Rs, NewS} = do_pb_ll_request3(CMD, S),
|
||||||
|
{RqID, CMD, Rs, NewS};
|
||||||
|
{RqID, CMD} ->
|
||||||
|
EpochID = element(2, CMD), % by common convention
|
||||||
|
{Rs, NewS} = do_pb_ll_request2(EpochID, CMD, S),
|
||||||
|
{RqID, CMD, Rs, NewS}
|
||||||
|
end,
|
||||||
|
{machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}.
|
||||||
|
|
||||||
|
do_pb_ll_request2(EpochID, CMD, S) ->
|
||||||
|
{Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2),
|
||||||
|
if Wedged_p == true ->
|
||||||
|
{{error, wedged}, S#state{epoch_id=CurrentEpochID}};
|
||||||
|
is_tuple(EpochID)
|
||||||
|
andalso
|
||||||
|
EpochID /= CurrentEpochID ->
|
||||||
|
{Epoch, _} = EpochID,
|
||||||
|
{CurrentEpoch, _} = CurrentEpochID,
|
||||||
|
if Epoch < CurrentEpoch ->
|
||||||
|
ok;
|
||||||
|
true ->
|
||||||
|
%% We're at same epoch # but different checksum, or
|
||||||
|
%% we're at a newer/bigger epoch #.
|
||||||
|
wedge_myself(S#state.flu_name, CurrentEpochID),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
{{error, bad_epoch}, S#state{epoch_id=CurrentEpochID}};
|
||||||
|
true ->
|
||||||
|
do_pb_ll_request3(CMD, S#state{epoch_id=CurrentEpochID})
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) ->
|
||||||
|
{Msg, S};
|
||||||
|
do_pb_ll_request3({low_auth, _BogusEpochID, _User, _Pass}, S) ->
|
||||||
|
{-6, S};
|
||||||
|
do_pb_ll_request3(Cmd, S) ->
|
||||||
|
{execute_cmd(Cmd), S}.
|
||||||
|
%do_pb_ll_request3({low_append_chunk, _EpochID, PKey, Prefix, Chunk, CSum_tag,
|
||||||
|
% CSum, ChunkExtra}, S) ->
|
||||||
|
% {do_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum,
|
||||||
|
% ChunkExtra, S), S};
|
||||||
|
%do_pb_ll_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag,
|
||||||
|
% CSum}, S) ->
|
||||||
|
% {do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, S), S};
|
||||||
|
%do_pb_ll_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts}, S) ->
|
||||||
|
% {do_server_read_chunk(File, Offset, Size, Opts, S), S};
|
||||||
|
%do_pb_ll_request3({low_checksum_list, _EpochID, File}, S) ->
|
||||||
|
% {do_server_checksum_listing(File, S), S};
|
||||||
|
%do_pb_ll_request3({low_list_files, _EpochID}, S) ->
|
||||||
|
% {do_server_list_files(S), S};
|
||||||
|
%do_pb_ll_request3({low_wedge_status, _EpochID}, S) ->
|
||||||
|
% {do_server_wedge_status(S), S};
|
||||||
|
%do_pb_ll_request3({low_delete_migration, _EpochID, File}, S) ->
|
||||||
|
% {do_server_delete_migration(File, S), S};
|
||||||
|
%do_pb_ll_request3({low_trunc_hack, _EpochID, File}, S) ->
|
||||||
|
% {do_server_trunc_hack(File, S), S};
|
||||||
|
%do_pb_ll_request3({low_proj, PCMD}, S) ->
|
||||||
|
% {do_server_proj_request(PCMD, S), S}.
|
||||||
|
execute_cmd(_Cmd) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
|
||||||
|
do_pb_hl_request(#mpb_request{req_id=ReqID}, #state{pb_mode=low}=S) ->
|
||||||
|
Result = {low_error, 41, "High protocol request while in low mode"},
|
||||||
|
{machi_pb_translate:to_pb_response(ReqID, unused, Result), S};
|
||||||
|
do_pb_hl_request(PB_request, S) ->
|
||||||
|
{ReqID, Cmd} = machi_pb_translate:from_pb_request(PB_request),
|
||||||
|
{Result, S2} = do_pb_hl_request2(Cmd, S),
|
||||||
|
{machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}.
|
||||||
|
|
||||||
|
do_pb_hl_request2({high_echo, Msg}, S) ->
|
||||||
|
{Msg, S};
|
||||||
|
do_pb_hl_request2({high_auth, _User, _Pass}, S) ->
|
||||||
|
{-77, S};
|
||||||
|
do_pb_hl_request2({high_append_chunk, _todoPK, Prefix, ChunkBin, TaggedCSum,
|
||||||
|
ChunkExtra}, #state{high_clnt=Clnt}=S) ->
|
||||||
|
Chunk = {TaggedCSum, ChunkBin},
|
||||||
|
Res = machi_cr_client:append_chunk_extra(Clnt, Prefix, Chunk,
|
||||||
|
ChunkExtra),
|
||||||
|
{Res, S};
|
||||||
|
do_pb_hl_request2({high_write_chunk, File, Offset, ChunkBin, TaggedCSum},
|
||||||
|
#state{high_clnt=Clnt}=S) ->
|
||||||
|
Chunk = {TaggedCSum, ChunkBin},
|
||||||
|
Res = machi_cr_client:write_chunk(Clnt, File, Offset, Chunk),
|
||||||
|
{Res, S};
|
||||||
|
do_pb_hl_request2({high_read_chunk, File, Offset, Size},
|
||||||
|
#state{high_clnt=Clnt}=S) ->
|
||||||
|
Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size),
|
||||||
|
{Res, S};
|
||||||
|
do_pb_hl_request2({high_checksum_list, File}, #state{high_clnt=Clnt}=S) ->
|
||||||
|
Res = machi_cr_client:checksum_list(Clnt, File),
|
||||||
|
{Res, S};
|
||||||
|
do_pb_hl_request2({high_list_files}, #state{high_clnt=Clnt}=S) ->
|
||||||
|
Res = machi_cr_client:list_files(Clnt),
|
||||||
|
{Res, S}.
|
||||||
|
|
||||||
|
wedge_myself(_, _) -> ok.
|
||||||
|
|
149
src/machi_flu_manager.erl
Normal file
149
src/machi_flu_manager.erl
Normal file
|
@ -0,0 +1,149 @@
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%%
|
||||||
|
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% This file is provided to you under the Apache License,
|
||||||
|
%% Version 2.0 (the "License"); you may not use this file
|
||||||
|
%% except in compliance with the License. You may obtain
|
||||||
|
%% a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing,
|
||||||
|
%% software distributed under the License is distributed on an
|
||||||
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
%% KIND, either express or implied. See the License for the
|
||||||
|
%% specific language governing permissions and limitations
|
||||||
|
%% under the License.
|
||||||
|
%%
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(machi_flu_manager).
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
-include("machi_flu.hrl"). %% contains state record
|
||||||
|
|
||||||
|
%% Public API
|
||||||
|
-export([
|
||||||
|
start_link/1,
|
||||||
|
start/1,
|
||||||
|
stop/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% gen_server callbacks
|
||||||
|
-export([
|
||||||
|
init/1,
|
||||||
|
handle_cast/2,
|
||||||
|
handle_call/3,
|
||||||
|
handle_info/2,
|
||||||
|
terminate/2,
|
||||||
|
code_change/3
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% PUBLIC API
|
||||||
|
|
||||||
|
start_link(S = #state{flu_name = Name}) ->
|
||||||
|
gen_server:start_link({local, Name}, ?MODULE, [S], []).
|
||||||
|
|
||||||
|
%% TODO Make this a functional thing
|
||||||
|
start(_FluName) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%% TODO Make this functional
|
||||||
|
stop(_) -> ok.
|
||||||
|
|
||||||
|
%% gen_server callbacks
|
||||||
|
init(S = #state{flu_name = N, epoch_id = EpochId, wedged = W}) ->
|
||||||
|
Tid = ets:new(make_name(N, "_epoch"), [set, protected, named_table, {read_concurrency, true}]),
|
||||||
|
true = ets:insert(Tid, {epoch, {W, EpochId}}),
|
||||||
|
{ok, S#state{etstab=Tid}}.
|
||||||
|
|
||||||
|
handle_cast(Req, S) ->
|
||||||
|
lager:warning("Unexpected cast ~p", [Req]),
|
||||||
|
{noreply, S}.
|
||||||
|
|
||||||
|
handle_call(Req, _From, S) ->
|
||||||
|
lager:warning("Unexpected call ~p", [Req]),
|
||||||
|
{reply, unexpected, S}.
|
||||||
|
|
||||||
|
handle_info({wedge_myself, _EpochId}, S = #state{wedged = true}) ->
|
||||||
|
lager:debug("Request to wedge myself, but I'm already wedged. Ignoring."),
|
||||||
|
{noreply, S};
|
||||||
|
handle_info({wedge_myself, EpochId}, S = #state{flu_name = N,
|
||||||
|
wedged = false,
|
||||||
|
epoch_id = E,
|
||||||
|
etstab = Tid}) when EpochId == E ->
|
||||||
|
true = ets:insert(Tid, {epoch, {true, E}}),
|
||||||
|
kick_chain_manager(N),
|
||||||
|
{noreply, S#state{wedged=true}};
|
||||||
|
|
||||||
|
handle_info({wedge_state_change, Bool, {NewEpoch, _}},
|
||||||
|
S = #state{epoch_id = undefined, etstab=Tid}) ->
|
||||||
|
true = ets:insert(Tid, {epoch, {Bool, NewEpoch}}),
|
||||||
|
{noreply, S#state{wedged = Bool, epoch_id = NewEpoch}};
|
||||||
|
handle_info({wedge_state_change, Bool, {NewEpoch, _}},
|
||||||
|
S = #state{epoch_id = E, etstab = Tid}) when NewEpoch >= E ->
|
||||||
|
true = ets:insert(Tid, {epoch, {Bool, NewEpoch}}),
|
||||||
|
{noreply, S#state{wedged = Bool, epoch_id = NewEpoch}};
|
||||||
|
handle_info(M = {wedge_state_change, _Bool, {NewEpoch, _}},
|
||||||
|
S = #state{epoch_id = E}) when NewEpoch < E ->
|
||||||
|
lager:debug("Wedge state change message ~p, but my epoch id is higher (~p). Ignoring.",
|
||||||
|
[M, E]),
|
||||||
|
{noreply, S};
|
||||||
|
|
||||||
|
handle_info({wedge_status, From}, S = #state{wedged = W, epoch_id = E}) ->
|
||||||
|
From ! {wedge_status_reply, W, E},
|
||||||
|
{noreply, S};
|
||||||
|
|
||||||
|
handle_info({seq_append, From, _Prefix, _Chunk, _Csum, _Extra, _EpochId},
|
||||||
|
S = #state{wedged = true}) ->
|
||||||
|
From ! wedged,
|
||||||
|
{noreply, S};
|
||||||
|
|
||||||
|
handle_info({seq_append, From, Prefix, Chunk, Csum, Extra, EpochId},
|
||||||
|
S = #state{epoch_id = EpochId}) ->
|
||||||
|
handle_append(From, Prefix, Chunk, Csum, Extra),
|
||||||
|
{noreply, S};
|
||||||
|
|
||||||
|
handle_info(Info, S) ->
|
||||||
|
lager:warning("Unexpected info ~p", [Info]),
|
||||||
|
{noreply, S}.
|
||||||
|
|
||||||
|
terminate(Reason, _S) ->
|
||||||
|
lager:info("Terminating because ~p", [Reason]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
code_change(_Old, S, _Extra) ->
|
||||||
|
{ok, S}.
|
||||||
|
|
||||||
|
%% private
|
||||||
|
kick_chain_manager(Name) ->
|
||||||
|
Chmgr = machi_chain_manager1:make_chmgr_regname(Name),
|
||||||
|
spawn(fun() ->
|
||||||
|
catch machi_chain_manager1:trigger_react_to_env(Chmgr)
|
||||||
|
end).
|
||||||
|
|
||||||
|
handle_append(From, Prefix, Chunk, Csum, Extra) ->
|
||||||
|
spawn(fun() ->
|
||||||
|
dispatch_append(From, Prefix, Chunk, Csum, Extra)
|
||||||
|
end).
|
||||||
|
|
||||||
|
dispatch_append(From, Prefix, Chunk, Csum, Extra) ->
|
||||||
|
{ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(Prefix),
|
||||||
|
{Tag, CS} = machi_util:unmake_tagged_csum(Csum),
|
||||||
|
try
|
||||||
|
{ok, Filename, Offset} = machi_flu_file_proxy:append(Pid,
|
||||||
|
[{client_csum_tag, Tag}, {client_csum, CS}],
|
||||||
|
Extra, Chunk),
|
||||||
|
From ! {assignment, Offset, Filename},
|
||||||
|
exit(normal)
|
||||||
|
catch
|
||||||
|
_Type:Reason ->
|
||||||
|
lager:error("Could not append chunk to prefix ~p because ~p",
|
||||||
|
[Prefix, Reason]),
|
||||||
|
exit(Reason)
|
||||||
|
end.
|
||||||
|
|
||||||
|
make_name(N, Suffix) ->
|
||||||
|
atom_to_list(N) ++ Suffix.
|
217
src/machi_flu_metadata_mgr.erl
Normal file
217
src/machi_flu_metadata_mgr.erl
Normal file
|
@ -0,0 +1,217 @@
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%%
|
||||||
|
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% This file is provided to you under the Apache License,
|
||||||
|
%% Version 2.0 (the "License"); you may not use this file
|
||||||
|
%% except in compliance with the License. You may obtain
|
||||||
|
%% a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing,
|
||||||
|
%% software distributed under the License is distributed on an
|
||||||
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
%% KIND, either express or implied. See the License for the
|
||||||
|
%% specific language governing permissions and limitations
|
||||||
|
%% under the License.
|
||||||
|
%%
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc This is a metadata service for the machi FLU which currently
|
||||||
|
%% tracks the mappings between filenames and file proxies.
|
||||||
|
%%
|
||||||
|
%% The service takes a given hash space and spreads it out over a
|
||||||
|
%% pool of N processes which are responsible for 1/Nth the hash
|
||||||
|
%% space. When a user requests an operation on a particular file
|
||||||
|
%% the filename is hashed into the hash space and the request
|
||||||
|
%% forwarded to a particular manager responsible for that slice
|
||||||
|
%% of the hash space.
|
||||||
|
%%
|
||||||
|
%% The current hash implementation is `erlang:phash2/1' which has
|
||||||
|
%% a range between 0..2^27-1 or 134,217,727.
|
||||||
|
|
||||||
|
-module(machi_flu_metadata_mgr).
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
|
||||||
|
-define(MAX_MGRS, 10). %% number of managers to start by default.
|
||||||
|
-define(HASH(X), erlang:phash2(X)). %% hash algorithm to use
|
||||||
|
-define(TIMEOUT, 10 * 1000). %% 10 second timeout
|
||||||
|
|
||||||
|
-record(state, {name :: atom(),
|
||||||
|
datadir :: string(),
|
||||||
|
tid :: ets:tid()
|
||||||
|
}).
|
||||||
|
|
||||||
|
%% This record goes in the ets table where prefix is the key
|
||||||
|
-record(md, {filename :: string(),
|
||||||
|
proxy_pid :: undefined|pid(),
|
||||||
|
mref :: undefined|reference() %% monitor ref for file proxy
|
||||||
|
}).
|
||||||
|
|
||||||
|
%% public api
|
||||||
|
-export([
|
||||||
|
start_link/2,
|
||||||
|
lookup_manager_pid/1,
|
||||||
|
lookup_proxy_pid/1,
|
||||||
|
start_proxy_pid/1,
|
||||||
|
stop_proxy_pid/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% gen_server callbacks
|
||||||
|
-export([
|
||||||
|
init/1,
|
||||||
|
handle_cast/2,
|
||||||
|
handle_call/3,
|
||||||
|
handle_info/2,
|
||||||
|
terminate/2,
|
||||||
|
code_change/3
|
||||||
|
]).
|
||||||
|
|
||||||
|
%% Public API
|
||||||
|
|
||||||
|
start_link(Name, DataDir) when is_atom(Name) andalso is_list(DataDir) ->
|
||||||
|
gen_server:start_link({local, Name}, ?MODULE, [Name, DataDir], []).
|
||||||
|
|
||||||
|
lookup_manager_pid({file, Filename}) ->
|
||||||
|
whereis(get_manager_atom(Filename)).
|
||||||
|
|
||||||
|
lookup_proxy_pid({file, Filename}) ->
|
||||||
|
gen_server:call(get_manager_atom(Filename), {proxy_pid, Filename}, ?TIMEOUT).
|
||||||
|
|
||||||
|
start_proxy_pid({file, Filename}) ->
|
||||||
|
gen_server:call(get_manager_atom(Filename), {start_proxy_pid, Filename}, ?TIMEOUT).
|
||||||
|
|
||||||
|
stop_proxy_pid({file, Filename}) ->
|
||||||
|
gen_server:call(get_manager_atom(Filename), {stop_proxy_pid, Filename}, ?TIMEOUT).
|
||||||
|
|
||||||
|
%% gen_server callbacks
|
||||||
|
init([Name, DataDir]) ->
|
||||||
|
Tid = ets:new(Name, [{keypos, 2}, {read_concurrency, true}, {write_concurrency, true}]),
|
||||||
|
{ok, #state{ name = Name, datadir = DataDir, tid = Tid}}.
|
||||||
|
|
||||||
|
handle_cast(Req, State) ->
|
||||||
|
lager:warning("Got unknown cast ~p", [Req]),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
handle_call({proxy_pid, Filename}, _From, State = #state{ tid = Tid }) ->
|
||||||
|
Reply = case lookup_md(Tid, Filename) of
|
||||||
|
not_found -> undefined;
|
||||||
|
R -> R#md.proxy_pid
|
||||||
|
end,
|
||||||
|
{reply, Reply, State};
|
||||||
|
|
||||||
|
handle_call({start_proxy_pid, Filename}, _From, State = #state{ tid = Tid, datadir = D }) ->
|
||||||
|
NewR = case lookup_md(Tid, Filename) of
|
||||||
|
not_found ->
|
||||||
|
start_file_proxy(D, Filename);
|
||||||
|
#md{ proxy_pid = undefined } = R0 ->
|
||||||
|
start_file_proxy(D, R0);
|
||||||
|
#md{ proxy_pid = _Pid } = R1 ->
|
||||||
|
R1
|
||||||
|
end,
|
||||||
|
update_ets(Tid, NewR),
|
||||||
|
{reply, {ok, NewR#md.proxy_pid}, State};
|
||||||
|
handle_call({stop_proxy_pid, Filename}, _From, State = #state{ tid = Tid }) ->
|
||||||
|
case lookup_md(Tid, Filename) of
|
||||||
|
not_found ->
|
||||||
|
ok;
|
||||||
|
#md{ proxy_pid = undefined } ->
|
||||||
|
ok;
|
||||||
|
#md{ proxy_pid = Pid, mref = M } = R ->
|
||||||
|
demonitor(M, [flush]),
|
||||||
|
machi_file_proxy:stop(Pid),
|
||||||
|
update_ets(Tid, R#md{ proxy_pid = undefined, mref = undefined })
|
||||||
|
end,
|
||||||
|
{reply, ok, State};
|
||||||
|
|
||||||
|
handle_call(Req, From, State) ->
|
||||||
|
lager:warning("Got unknown call ~p from ~p", [Req, From]),
|
||||||
|
{reply, hoge, State}.
|
||||||
|
|
||||||
|
handle_info({'DOWN', Mref, process, Pid, normal}, State = #state{ tid = Tid }) ->
|
||||||
|
lager:debug("file proxy ~p shutdown normally", [Pid]),
|
||||||
|
clear_ets(Tid, Mref),
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
|
handle_info({'DOWN', Mref, process, Pid, file_rollover}, State = #state{ tid = Tid }) ->
|
||||||
|
lager:info("file proxy ~p shutdown because of file rollover", [Pid]),
|
||||||
|
R = get_md_record_by_mref(Tid, Mref),
|
||||||
|
[Prefix | _Rest] = machi_util:parse_filename({file, R#md.filename}),
|
||||||
|
|
||||||
|
%% We only increment the counter here. The filename will be generated on the
|
||||||
|
%% next append request to that prefix and since the filename will have a new
|
||||||
|
%% sequence number it probably will be associated with a different metadata
|
||||||
|
%% manager. That's why we don't want to generate a new file name immediately
|
||||||
|
%% and use it to start a new file proxy.
|
||||||
|
ok = machi_flu_filename_mgr:increment_prefix_sequence({prefix, Prefix}),
|
||||||
|
|
||||||
|
%% purge our ets table of this entry completely since it is likely the
|
||||||
|
%% new filename (whenever it comes) will be in a different manager than
|
||||||
|
%% us.
|
||||||
|
purge_ets(Tid, R),
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
|
handle_info({'DOWN', Mref, process, Pid, wedged}, State = #state{ tid = Tid }) ->
|
||||||
|
lager:error("file proxy ~p shutdown because it's wedged", [Pid]),
|
||||||
|
clear_ets(Tid, Mref),
|
||||||
|
{noreply, State};
|
||||||
|
handle_info({'DOWN', Mref, process, Pid, Error}, State = #state{ tid = Tid }) ->
|
||||||
|
lager:error("file proxy ~p shutdown because ~p", [Pid, Error]),
|
||||||
|
clear_ets(Tid, Mref),
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
|
|
||||||
|
handle_info(Info, State) ->
|
||||||
|
lager:warning("Got unknown info ~p", [Info]),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
terminate(Reason, _State) ->
|
||||||
|
lager:info("Shutting down because ~p", [Reason]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
%% Private functions
|
||||||
|
|
||||||
|
compute_hash(Data) ->
|
||||||
|
?HASH(Data).
|
||||||
|
|
||||||
|
compute_worker(Hash) ->
|
||||||
|
Hash rem ?MAX_MGRS.
|
||||||
|
|
||||||
|
build_metadata_mgr_name(N) when is_integer(N) ->
|
||||||
|
list_to_atom("machi_flu_metadata_mgr_" ++ integer_to_list(N)).
|
||||||
|
|
||||||
|
get_manager_atom(Data) ->
|
||||||
|
build_metadata_mgr_name(compute_worker(compute_hash(Data))).
|
||||||
|
|
||||||
|
lookup_md(Tid, Data) ->
|
||||||
|
case ets:lookup(Tid, Data) of
|
||||||
|
[] -> not_found;
|
||||||
|
[R] -> R
|
||||||
|
end.
|
||||||
|
|
||||||
|
start_file_proxy(D, R = #md{filename = F} ) ->
|
||||||
|
{ok, Pid} = machi_file_proxy_sup:start_proxy(D, F),
|
||||||
|
Mref = monitor(process, Pid),
|
||||||
|
R#md{ proxy_pid = Pid, mref = Mref };
|
||||||
|
|
||||||
|
start_file_proxy(D, Filename) ->
|
||||||
|
start_file_proxy(D, #md{ filename = Filename }).
|
||||||
|
|
||||||
|
update_ets(Tid, R) ->
|
||||||
|
ets:insert(Tid, R).
|
||||||
|
|
||||||
|
clear_ets(Tid, Mref) ->
|
||||||
|
R = get_md_record_by_mref(Tid, Mref),
|
||||||
|
update_ets(Tid, R#md{ proxy_pid = undefined, mref = undefined }).
|
||||||
|
|
||||||
|
purge_ets(Tid, R) ->
|
||||||
|
ok = ets:delete_object(Tid, R).
|
||||||
|
|
||||||
|
get_md_record_by_mref(Tid, Mref) ->
|
||||||
|
[R] = ets:match_object(Tid, {md, '_', '_', Mref}),
|
||||||
|
R.
|
|
@ -36,7 +36,7 @@
|
||||||
make_projection_filename/2,
|
make_projection_filename/2,
|
||||||
read_max_filenum/2, increment_max_filenum/2,
|
read_max_filenum/2, increment_max_filenum/2,
|
||||||
info_msg/2, verb/1, verb/2,
|
info_msg/2, verb/1, verb/2,
|
||||||
mbytes/1,
|
mbytes/1, parse_filename/1,
|
||||||
%% TCP protocol helpers
|
%% TCP protocol helpers
|
||||||
connect/2, connect/3,
|
connect/2, connect/3,
|
||||||
%% List twiddling
|
%% List twiddling
|
||||||
|
@ -89,8 +89,7 @@ make_checksum_filename(DataDir, FileName) ->
|
||||||
-spec make_data_filename(string(), string(), atom()|string()|binary(), integer()) ->
|
-spec make_data_filename(string(), string(), atom()|string()|binary(), integer()) ->
|
||||||
{binary(), string()}.
|
{binary(), string()}.
|
||||||
make_data_filename(DataDir, Prefix, SequencerName, FileNum) ->
|
make_data_filename(DataDir, Prefix, SequencerName, FileNum) ->
|
||||||
File = erlang:iolist_to_binary(io_lib:format("~s.~s.~w",
|
File = erlang:iolist_to_binary(string:join([Prefix, SequencerName, integer_to_list(FileNum)], ?FN_DELIMITER)),
|
||||||
[Prefix, SequencerName, FileNum])),
|
|
||||||
FullPath = lists:flatten(io_lib:format("~s/data/~s", [DataDir, File])),
|
FullPath = lists:flatten(io_lib:format("~s/data/~s", [DataDir, File])),
|
||||||
{File, FullPath}.
|
{File, FullPath}.
|
||||||
|
|
||||||
|
@ -262,6 +261,10 @@ mbytes(0) ->
|
||||||
mbytes(Size) ->
|
mbytes(Size) ->
|
||||||
lists:flatten(io_lib:format("~.1.0f", [max(0.1, Size / (1024*1024))])).
|
lists:flatten(io_lib:format("~.1.0f", [max(0.1, Size / (1024*1024))])).
|
||||||
|
|
||||||
|
-spec parse_filename( Filename :: {file, string()} ) -> [ string() ].
|
||||||
|
parse_filename({file, F}) ->
|
||||||
|
string:tokens(F, ?FN_DELIMITER).
|
||||||
|
|
||||||
%% @doc Log an 'info' level message.
|
%% @doc Log an 'info' level message.
|
||||||
|
|
||||||
-spec info_msg(string(), list()) -> term().
|
-spec info_msg(string(), list()) -> term().
|
||||||
|
|
328
test/machi_file_proxy_eqc.erl
Normal file
328
test/machi_file_proxy_eqc.erl
Normal file
|
@ -0,0 +1,328 @@
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%%
|
||||||
|
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% This file is provided to you under the Apache License,
|
||||||
|
%% Version 2.0 (the "License"); you may not use this file
|
||||||
|
%% except in compliance with the License. You may obtain
|
||||||
|
%% a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing,
|
||||||
|
%% software distributed under the License is distributed on an
|
||||||
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
%% KIND, either express or implied. See the License for the
|
||||||
|
%% specific language governing permissions and limitations
|
||||||
|
%% under the License.
|
||||||
|
%%
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(machi_file_proxy_eqc).
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
-ifdef(EQC).
|
||||||
|
-compile(export_all).
|
||||||
|
-include("machi.hrl").
|
||||||
|
-include_lib("eqc/include/eqc.hrl").
|
||||||
|
-include_lib("eqc/include/eqc_statem.hrl").
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
-define(QC_OUT(P),
|
||||||
|
eqc:on_output(fun(Str, Args) -> io:format(user, Str, Args) end, P)).
|
||||||
|
|
||||||
|
|
||||||
|
%% EUNIT TEST DEFINITION
|
||||||
|
eqc_test_() ->
|
||||||
|
{timeout, 60,
|
||||||
|
{spawn,
|
||||||
|
[
|
||||||
|
{timeout, 30, ?_assertEqual(true, eqc:quickcheck(eqc:testing_time(15, ?QC_OUT(prop_ok()))))}
|
||||||
|
]
|
||||||
|
}}.
|
||||||
|
|
||||||
|
%% SHELL HELPERS
|
||||||
|
test() ->
|
||||||
|
test(100).
|
||||||
|
|
||||||
|
test(N) ->
|
||||||
|
quickcheck(numtests(N, prop_ok())).
|
||||||
|
|
||||||
|
check() ->
|
||||||
|
check(prop_ok(), current_counterexample()).
|
||||||
|
|
||||||
|
%% GENERATORS
|
||||||
|
|
||||||
|
csum_type() ->
|
||||||
|
elements([?CSUM_TAG_NONE, ?CSUM_TAG_CLIENT_SHA, ?CSUM_TAG_SERVER_SHA]).
|
||||||
|
|
||||||
|
csum(Type, Binary) ->
|
||||||
|
case Type of
|
||||||
|
?CSUM_TAG_NONE -> <<>>;
|
||||||
|
_ -> machi_util:checksum_chunk(Binary)
|
||||||
|
end.
|
||||||
|
|
||||||
|
position(P) ->
|
||||||
|
?LET(O, offset(), P + O).
|
||||||
|
|
||||||
|
offset() ->
|
||||||
|
?SUCHTHAT(X, int(), X >= 0).
|
||||||
|
|
||||||
|
offset_base() ->
|
||||||
|
elements([4096, 6144, 7168, 8192, 20480, 100000, 1000000]).
|
||||||
|
|
||||||
|
big_offset() ->
|
||||||
|
?LET(P, int(), ?LET(X, offset_base(), P+X)).
|
||||||
|
|
||||||
|
len() ->
|
||||||
|
?SUCHTHAT(X, int(), X >= 1).
|
||||||
|
|
||||||
|
data_with_csum() ->
|
||||||
|
?LET({B,T},{eqc_gen:largebinary(), csum_type()}, {B,T, csum(T, B)}).
|
||||||
|
%?LET({B,T},{eqc_gen:binary(), csum_type()}, {B,T, csum(T, B)}).
|
||||||
|
|
||||||
|
data_with_csum(Limit) ->
|
||||||
|
%?LET({B,T},{?LET(S, Limit, eqc_gen:largebinary(S)), csum_type()}, {B,T, csum(T, B)}).
|
||||||
|
?LET({B,T},{?LET(S, Limit, eqc_gen:binary(S)), csum_type()}, {B,T, csum(T, B)}).
|
||||||
|
|
||||||
|
intervals([]) ->
|
||||||
|
[];
|
||||||
|
intervals([N]) ->
|
||||||
|
[{N, choose(1,150)}];
|
||||||
|
intervals([A,B|T]) ->
|
||||||
|
[{A, choose(1, B-A)}|intervals([B|T])].
|
||||||
|
|
||||||
|
interval_list() ->
|
||||||
|
?LET(L, list(choose(1024, 4096)), intervals(lists:usort(L))).
|
||||||
|
|
||||||
|
shuffle_interval() ->
|
||||||
|
?LET(L, interval_list(), shuffle(L)).
|
||||||
|
|
||||||
|
get_written_interval(L) ->
|
||||||
|
?LET({O, Ln}, elements(L), {O+1, Ln-1}).
|
||||||
|
|
||||||
|
%% INITIALIZATION
|
||||||
|
|
||||||
|
-record(state, {pid, prev_extra = 0, planned_writes=[], written=[]}).
|
||||||
|
|
||||||
|
initial_state() -> #state{written=[{0,1024}]}.
|
||||||
|
initial_state(I) -> #state{written=[{0,1024}], planned_writes=I}.
|
||||||
|
|
||||||
|
weight(_S, rewrite) -> 1;
|
||||||
|
weight(_S, _) -> 2.
|
||||||
|
|
||||||
|
%% HELPERS
|
||||||
|
|
||||||
|
%% check if an operation is permitted based on whether a write has
|
||||||
|
%% occurred
|
||||||
|
check_writes(_Op, [], _Off, _L) ->
|
||||||
|
false;
|
||||||
|
check_writes(_Op, [{Pos, Sz}|_T], Off, L) when Pos == Off
|
||||||
|
andalso Sz == L ->
|
||||||
|
mostly_true;
|
||||||
|
check_writes(read, [{Pos, Sz}|_T], Off, L) when Off >= Pos
|
||||||
|
andalso Off < (Pos + Sz)
|
||||||
|
andalso Sz >= ( L - ( Off - Pos ) ) ->
|
||||||
|
true;
|
||||||
|
check_writes(write, [{Pos, Sz}|_T], Off, L) when ( Off + L ) > Pos
|
||||||
|
andalso Off < (Pos + Sz) ->
|
||||||
|
true;
|
||||||
|
check_writes(Op, [_H|T], Off, L) ->
|
||||||
|
check_writes(Op, T, Off, L).
|
||||||
|
|
||||||
|
is_error({error, _}) -> true;
|
||||||
|
is_error({error, _, _}) -> true;
|
||||||
|
is_error(Other) -> {expected_ERROR, Other}.
|
||||||
|
|
||||||
|
probably_error(ok) -> true;
|
||||||
|
probably_error(V) -> is_error(V).
|
||||||
|
|
||||||
|
is_ok({ok, _, _}) -> true;
|
||||||
|
is_ok(ok) -> true;
|
||||||
|
is_ok(Other) -> {expected_OK, Other}.
|
||||||
|
|
||||||
|
get_offset({ok, _Filename, Offset}) -> Offset;
|
||||||
|
get_offset(_) -> error(badarg).
|
||||||
|
|
||||||
|
offset_valid(Offset, Extra, L) ->
|
||||||
|
{Pos, Sz} = lists:last(L),
|
||||||
|
Offset == Pos + Sz + Extra.
|
||||||
|
|
||||||
|
-define(TESTDIR, "./eqc").
|
||||||
|
|
||||||
|
cleanup() ->
|
||||||
|
[begin
|
||||||
|
Fs = filelib:wildcard(?TESTDIR ++ Glob),
|
||||||
|
[file:delete(F) || F <- Fs],
|
||||||
|
[file:del_dir(F) || F <- Fs]
|
||||||
|
end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ],
|
||||||
|
_ = file:del_dir(?TESTDIR),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%% start
|
||||||
|
|
||||||
|
start_pre(S) ->
|
||||||
|
S#state.pid == undefined.
|
||||||
|
|
||||||
|
start_command(S) ->
|
||||||
|
{call, ?MODULE, start, [S]}.
|
||||||
|
|
||||||
|
start(_S) ->
|
||||||
|
{_, _, MS} = os:timestamp(),
|
||||||
|
File = test_server:temp_name("eqc_data") ++ "." ++ integer_to_list(MS),
|
||||||
|
{ok, Pid} = machi_file_proxy:start_link(File, ?TESTDIR),
|
||||||
|
unlink(Pid),
|
||||||
|
Pid.
|
||||||
|
|
||||||
|
start_next(S, Pid, _Args) ->
|
||||||
|
S#state{pid = Pid}.
|
||||||
|
|
||||||
|
%% read
|
||||||
|
|
||||||
|
read_pre(S) ->
|
||||||
|
S#state.pid /= undefined.
|
||||||
|
|
||||||
|
read_args(S) ->
|
||||||
|
[S#state.pid, offset(), len()].
|
||||||
|
|
||||||
|
read_ok(S, Off, L) ->
|
||||||
|
case S#state.written of
|
||||||
|
[{0, 1024}] -> false;
|
||||||
|
W -> check_writes(read, W, Off, L)
|
||||||
|
end.
|
||||||
|
|
||||||
|
read_post(S, [_Pid, Off, L], Res) ->
|
||||||
|
case read_ok(S, Off, L) of
|
||||||
|
true -> is_ok(Res);
|
||||||
|
mostly_true -> is_ok(Res);
|
||||||
|
false -> is_error(Res)
|
||||||
|
end.
|
||||||
|
|
||||||
|
read_next(S, _Res, _Args) -> S.
|
||||||
|
|
||||||
|
read(Pid, Offset, Length) ->
|
||||||
|
machi_file_proxy:read(Pid, Offset, Length).
|
||||||
|
|
||||||
|
%% write
|
||||||
|
|
||||||
|
write_pre(S) ->
|
||||||
|
S#state.pid /= undefined andalso S#state.planned_writes /= [].
|
||||||
|
|
||||||
|
%% do not allow writes with empty data
|
||||||
|
write_pre(_S, [_Pid, _Extra, {<<>>, _Tag, _Csum}]) ->
|
||||||
|
false;
|
||||||
|
write_pre(_S, _Args) ->
|
||||||
|
true.
|
||||||
|
|
||||||
|
write_args(S) ->
|
||||||
|
{Off, Len} = hd(S#state.planned_writes),
|
||||||
|
[S#state.pid, Off, data_with_csum(Len)].
|
||||||
|
|
||||||
|
write_ok(_S, [_Pid, Off, _Data]) when Off < 1024 -> false;
|
||||||
|
write_ok(S, [_Pid, Off, {Bin, _Tag, _Csum}]) ->
|
||||||
|
Size = iolist_size(Bin),
|
||||||
|
%% Check writes checks if a byte range is *written*
|
||||||
|
%% So writes are ok IFF they are NOT written, so
|
||||||
|
%% we want not check_writes/3 to be true.
|
||||||
|
check_writes(write, S#state.written, Off, Size).
|
||||||
|
|
||||||
|
write_post(S, Args, Res) ->
|
||||||
|
case write_ok(S, Args) of
|
||||||
|
%% false means this range has NOT been written before, so
|
||||||
|
%% it should succeed
|
||||||
|
false -> eq(Res, ok);
|
||||||
|
%% mostly true means we've written this range before BUT
|
||||||
|
%% as a special case if we get a call to write the EXACT
|
||||||
|
%% same data that's already on the disk, we return "ok"
|
||||||
|
%% instead of {error, written}.
|
||||||
|
mostly_true -> probably_error(Res);
|
||||||
|
%% If we get true, then we've already written this section
|
||||||
|
%% or a portion of this range to disk and should return an
|
||||||
|
%% error.
|
||||||
|
true -> is_error(Res)
|
||||||
|
end.
|
||||||
|
|
||||||
|
write_next(S, Res, [_Pid, Offset, {Bin, _Tag, _Csum}]) ->
|
||||||
|
S0 = case is_ok(Res) of
|
||||||
|
true ->
|
||||||
|
S#state{written = lists:sort(S#state.written ++ [{Offset, iolist_size(Bin)}]) };
|
||||||
|
_ ->
|
||||||
|
S
|
||||||
|
end,
|
||||||
|
S0#state{prev_extra = 0, planned_writes=tl(S0#state.planned_writes)}.
|
||||||
|
|
||||||
|
|
||||||
|
write(Pid, Offset, {Bin, Tag, Csum}) ->
|
||||||
|
Meta = [{client_csum_tag, Tag},
|
||||||
|
{client_csum, Csum}],
|
||||||
|
machi_file_proxy:write(Pid, Offset, Meta, Bin).
|
||||||
|
|
||||||
|
%% append
|
||||||
|
|
||||||
|
append_pre(S) ->
|
||||||
|
S#state.pid /= undefined.
|
||||||
|
|
||||||
|
%% do not allow appends with empty binary data
|
||||||
|
append_pre(_S, [_Pid, _Extra, {<<>>, _Tag, _Csum}]) ->
|
||||||
|
false;
|
||||||
|
append_pre(_S, _Args) ->
|
||||||
|
true.
|
||||||
|
|
||||||
|
append_args(S) ->
|
||||||
|
[S#state.pid, default(0, len()), data_with_csum()].
|
||||||
|
|
||||||
|
append(Pid, Extra, {Bin, Tag, Csum}) ->
|
||||||
|
Meta = [{client_csum_tag, Tag},
|
||||||
|
{client_csum, Csum}],
|
||||||
|
machi_file_proxy:append(Pid, Meta, Extra, Bin).
|
||||||
|
|
||||||
|
append_next(S, Res, [_Pid, Extra, {Bin, _Tag, _Csum}]) ->
|
||||||
|
case is_ok(Res) of
|
||||||
|
true ->
|
||||||
|
Offset = get_offset(Res),
|
||||||
|
true = offset_valid(Offset, S#state.prev_extra, S#state.written),
|
||||||
|
S#state{prev_extra = Extra, written = lists:sort(S#state.written ++ [{Offset, iolist_size(Bin)}])};
|
||||||
|
_ ->
|
||||||
|
S
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% appends should always succeed unless the disk is full
|
||||||
|
%% or there's a hardware failure.
|
||||||
|
append_post(_S, _Args, Res) ->
|
||||||
|
true == is_ok(Res).
|
||||||
|
|
||||||
|
%% rewrite
|
||||||
|
|
||||||
|
rewrite_pre(S) ->
|
||||||
|
S#state.pid /= undefined andalso S#state.written /= [].
|
||||||
|
|
||||||
|
rewrite_args(S) ->
|
||||||
|
?LET({Off, Len}, get_written_interval(S#state.written),
|
||||||
|
[S#state.pid, Off, data_with_csum(Len)]).
|
||||||
|
|
||||||
|
rewrite(Pid, Offset, {Bin, Tag, Csum}) ->
|
||||||
|
Meta = [{client_csum_tag, Tag},
|
||||||
|
{client_csum, Csum}],
|
||||||
|
machi_file_proxy:write(Pid, Offset, Meta, Bin).
|
||||||
|
|
||||||
|
rewrite_post(_S, _Args, Res) ->
|
||||||
|
is_error(Res).
|
||||||
|
|
||||||
|
rewrite_next(S, _Res, _Args) ->
|
||||||
|
S#state{prev_extra = 0}.
|
||||||
|
|
||||||
|
%% Property
|
||||||
|
|
||||||
|
prop_ok() ->
|
||||||
|
cleanup(),
|
||||||
|
?FORALL(I, shuffle_interval(),
|
||||||
|
?FORALL(Cmds, parallel_commands(?MODULE, initial_state(I)),
|
||||||
|
begin
|
||||||
|
{H, S, Res} = run_parallel_commands(?MODULE, Cmds),
|
||||||
|
pretty_commands(?MODULE, Cmds, {H, S, Res},
|
||||||
|
aggregate(command_names(Cmds), Res == ok))
|
||||||
|
end)
|
||||||
|
).
|
||||||
|
|
||||||
|
-endif. % EQC
|
||||||
|
-endif. % TEST
|
103
test/machi_file_proxy_test.erl
Normal file
103
test/machi_file_proxy_test.erl
Normal file
|
@ -0,0 +1,103 @@
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%%
|
||||||
|
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% This file is provided to you under the Apache License,
|
||||||
|
%% Version 2.0 (the "License"); you may not use this file
|
||||||
|
%% except in compliance with the License. You may obtain
|
||||||
|
%% a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing,
|
||||||
|
%% software distributed under the License is distributed on an
|
||||||
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
%% KIND, either express or implied. See the License for the
|
||||||
|
%% specific language governing permissions and limitations
|
||||||
|
%% under the License.
|
||||||
|
%%
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(machi_file_proxy_test).
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include("machi.hrl").
|
||||||
|
|
||||||
|
clean_up_data_dir(DataDir) ->
|
||||||
|
[begin
|
||||||
|
Fs = filelib:wildcard(DataDir ++ Glob),
|
||||||
|
[file:delete(F) || F <- Fs],
|
||||||
|
[file:del_dir(F) || F <- Fs]
|
||||||
|
end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ],
|
||||||
|
_ = file:del_dir(DataDir),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
-ifndef(PULSE).
|
||||||
|
|
||||||
|
-define(TESTDIR, "./t").
|
||||||
|
-define(HYOOGE, 1 * 1024 * 1024 * 1024). % 1 long GB
|
||||||
|
|
||||||
|
random_binary_single() ->
|
||||||
|
%% OK, I guess it's not that random...
|
||||||
|
<<"Four score and seven years ago our fathers brought forth on this
|
||||||
|
continent a new nation, conceived in liberty, and dedicated to the
|
||||||
|
proposition that all men are created equal.
|
||||||
|
|
||||||
|
Now we are engaged in a great civil war, testing whether that nation, or any
|
||||||
|
nation so conceived and so dedicated, can long endure. We are met on a great
|
||||||
|
battlefield of that war. We have come to dedicate a portion of that field, as a
|
||||||
|
final resting place for those who here gave their lives that that nation
|
||||||
|
might live. It is altogether fitting and proper that we should do this.
|
||||||
|
|
||||||
|
But, in a larger sense, we can not dedicate, we can not consecrate, we can not
|
||||||
|
hallow this ground. The brave men, living and dead, who struggled here, have
|
||||||
|
consecrated it, far above our poor power to add or detract. The world will
|
||||||
|
little note, nor long remember what we say here, but it can never forget what
|
||||||
|
they did here. It is for us the living, rather, to be dedicated here to the
|
||||||
|
unfinished work which they who fought here have thus far so nobly advanced. It
|
||||||
|
is rather for us to be here dedicated to the great task remaining before us—
|
||||||
|
that from these honored dead we take increased devotion to that cause for which
|
||||||
|
they gave the last full measure of devotion— that we here highly resolve that
|
||||||
|
these dead shall not have died in vain— that this nation, under God, shall have
|
||||||
|
a new birth of freedom— and that government of the people, by the people, for
|
||||||
|
the people, shall not perish from the earth.">>.
|
||||||
|
|
||||||
|
random_binary(Start, End) ->
|
||||||
|
Size = byte_size(random_binary_single()) - 1,
|
||||||
|
case End > Size of
|
||||||
|
true ->
|
||||||
|
Copies = ( End div Size ) + 1,
|
||||||
|
D0 = binary:copy(random_binary_single(), Copies),
|
||||||
|
binary:part(<<D0/binary>>, Start, End);
|
||||||
|
false ->
|
||||||
|
binary:part(random_binary_single(), Start, End)
|
||||||
|
end.
|
||||||
|
|
||||||
|
machi_file_proxy_test_() ->
|
||||||
|
clean_up_data_dir(?TESTDIR),
|
||||||
|
{ok, Pid} = machi_file_proxy:start_link("test", ?TESTDIR),
|
||||||
|
[
|
||||||
|
?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)),
|
||||||
|
?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)),
|
||||||
|
?_assertEqual({error, bad_arg}, machi_file_proxy:append(Pid, [], -1, <<"krep">>)),
|
||||||
|
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1)),
|
||||||
|
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, 1)),
|
||||||
|
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1024)),
|
||||||
|
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)),
|
||||||
|
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)),
|
||||||
|
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE))),
|
||||||
|
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
|
||||||
|
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)),
|
||||||
|
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)),
|
||||||
|
?_assertMatch({ok, _, _}, machi_file_proxy:read(Pid, 1025, 1000)),
|
||||||
|
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)),
|
||||||
|
?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))),
|
||||||
|
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))
|
||||||
|
].
|
||||||
|
|
||||||
|
-endif. % !PULSE
|
||||||
|
-endif. % TEST.
|
||||||
|
|
Loading…
Reference in a new issue