WIP: client side projection store, 1st API op (write), part II

This commit is contained in:
Scott Lystig Fritchie 2015-04-03 17:10:52 +09:00
parent 7205c5283e
commit acf54e3c21
9 changed files with 660 additions and 26 deletions

View file

@ -23,11 +23,14 @@
-include_lib("kernel/include/file.hrl").
-include("machi.hrl").
-include("machi_projection.hrl").
-export([start_link/1, stop/1]).
-record(state, {
reg_name :: atom(),
proj_store :: pid(),
append_pid :: pid(),
tcp_port :: non_neg_integer(),
data_dir :: string(),
wedge = true :: 'disabled' | boolean(),
@ -52,10 +55,16 @@ stop(Pid) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%
main2(RegName, TcpPort, DataDir, Rest) ->
S1 = #state{reg_name=RegName,
S0 = #state{reg_name=RegName,
tcp_port=TcpPort,
data_dir=DataDir,
props=Rest},
AppendPid = start_append_server(S0),
ProjRegName = make_projection_server_regname(RegName),
{ok, ProjectionPid} =
machi_projection_store:start_link(ProjRegName, DataDir, AppendPid),
S1 = S0#state{append_pid=AppendPid,
proj_store=ProjectionPid},
S2 = case proplists:get_value(dbg, Rest) of
undefined ->
S1;
@ -64,10 +73,18 @@ main2(RegName, TcpPort, DataDir, Rest) ->
dbg_props=DbgProps,
props=lists:keydelete(dbg, 1, Rest)}
end,
AppendPid = start_append_server(S2),
ListenPid = start_listen_server(S2),
Config_e = machi_util:make_config_filename(DataDir, "unused"),
ok = filelib:ensure_dir(Config_e),
{_, Data_e} = machi_util:make_data_filename(DataDir, "unused"),
ok = filelib:ensure_dir(Data_e),
Projection_e = machi_util:make_projection_filename(DataDir, "unused"),
ok = filelib:ensure_dir(Projection_e),
put(flu_reg_name, RegName),
put(flu_append_pid, AppendPid),
put(flu_projection_pid, ProjectionPid),
put(flu_listen_pid, ListenPid),
receive forever -> ok end.
@ -77,6 +94,9 @@ start_listen_server(S) ->
start_append_server(S) ->
spawn_link(fun() -> run_append_server(S) end).
%% start_projection_server(S) ->
%% spawn_link(fun() -> run_projection_server(S) end).
run_listen_server(#state{tcp_port=TcpPort}=S) ->
SockOpts = [{reuseaddr, true},
{mode, binary}, {active, false}, {packet, line}],
@ -97,7 +117,9 @@ append_server_loop(#state{data_dir=DataDir}=S) ->
{seq_append, From, Prefix, Chunk, CSum} ->
spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum,
DataDir) end),
append_server_loop(S)
append_server_loop(S);
{wedge_state_change, Boolean} ->
append_server_loop(S#state{wedge=Boolean})
end.
-define(EpochIDSpace, (4+20)).
@ -152,6 +174,8 @@ net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) ->
_EpochIDRaw:(?EpochIDSpace)/binary,
File:DelFileLenLF/binary, "\n">> ->
do_net_server_truncate_hackityhack(Sock, File, DataDir);
<<"PROJ ", LenHex:8/binary, "\n">> ->
do_projection_command(Sock, LenHex, S);
_ ->
machi_util:verb("Else Got: ~p\n", [Line]),
gen_tcp:send(Sock, "ERROR SYNTAX\n"),
@ -249,7 +273,6 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
file:close(FH)
end;
{error, enoent} when OptsHasWrite ->
ok = filelib:ensure_dir(Path),
do_net_server_readwrite_common(
Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun);
@ -315,10 +338,11 @@ decode_and_reply_net_server_ec_read_version_a(Sock, Rest) ->
ok = gen_tcp:send(Sock, ["ERASURE ", BodyLenHex, " ", Hdr, Body]).
do_net_server_listing(Sock, DataDir) ->
Files = filelib:wildcard("*", DataDir) -- ["config"],
{_, WildPath} = machi_util:make_data_filename(DataDir, ""),
Files = filelib:wildcard("*", WildPath),
Out = ["OK\n",
[begin
{ok, FI} = file:read_file_info(DataDir ++ "/" ++ File),
{ok, FI} = file:read_file_info(WildPath ++ "/" ++ File),
Size = FI#file_info.size,
SizeBin = <<Size:64/big>>,
[machi_util:bin_to_hexstr(SizeBin), <<" ">>,
@ -453,8 +477,6 @@ start_seq_append_server(Prefix, DataDir) ->
run_seq_append_server(Prefix, DataDir) ->
true = register(machi_util:make_regname(Prefix), self()),
ok = filelib:ensure_dir(DataDir ++ "/unused"),
ok = filelib:ensure_dir(DataDir ++ "/config/unused"),
run_seq_append_server2(Prefix, DataDir).
run_seq_append_server2(Prefix, DataDir) ->
@ -521,3 +543,36 @@ seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) ->
exit(normal)
end.
do_projection_command(Sock, LenHex, S) ->
try
Len = machi_util:hexstr_to_int(LenHex),
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, ProjCmdBin} = gen_tcp:recv(Sock, Len),
ok = inet:setopts(Sock, [{packet, line}]),
ProjCmd = binary_to_term(ProjCmdBin),
case handle_projection_command(ProjCmd, S) of
ok ->
ok = gen_tcp:send(Sock, <<"OK\n">>);
{error, written} ->
ok = gen_tcp:send(Sock, <<"ERROR WRITTEN\n">>);
{error, not_written} ->
ok = gen_tcp:send(Sock, <<"ERROR NOT-WRITTEN\n">>);
Else ->
TODO = list_to_binary(io_lib:format("TODO-YOLO-~w", [Else])),
ok = gen_tcp:send(Sock, [<<"ERROR ">>, TODO, <<"\n">>])
end
catch
What:Why ->
WHA = list_to_binary(io_lib:format("TODO-YOLO.~w:~w-~w",
[What, Why, erlang:get_stacktrace()])),
_ = (catch gen_tcp:send(Sock, [<<"ERROR ">>, WHA, <<"\n">>]))
end.
handle_projection_command({write_projection, ProjType, Proj},
#state{proj_store=ProjStore}) ->
machi_projection_store:write(ProjStore, ProjType, Proj);
handle_projection_command(Else, _S) ->
{error, unknown_cmd, Else}.
make_projection_server_regname(BaseName) ->
list_to_atom(atom_to_list(BaseName) ++ "_projection").

View file

@ -150,7 +150,9 @@ list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) ->
-spec write_projection(port(), projection_type(), projection()) ->
'ok' | {error, written} | {error, term()}.
write_projection(Sock, ProjType, Proj) ->
write_projection(Sock, ProjType, Proj)
when ProjType == 'public' orelse ProjType == 'private',
is_record(Proj, projection_v1) ->
write_projection2(Sock, ProjType, Proj).
%% @doc Write a projection `Proj' of type `ProjType'.
@ -158,7 +160,9 @@ write_projection(Sock, ProjType, Proj) ->
-spec write_projection(inet_host(), inet_port(),
projection_type(), projection()) ->
'ok' | {error, written} | {error, term()}.
write_projection(Host, TcpPort, ProjType, Proj) ->
write_projection(Host, TcpPort, ProjType, Proj)
when ProjType == 'public' orelse ProjType == 'private',
is_record(Proj, projection_v1) ->
Sock = machi_util:connect(Host, TcpPort),
try
write_projection2(Sock, ProjType, Proj)

View file

@ -0,0 +1,164 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_projection_store).
-include("machi_projection.hrl").
%% API
-export([
start_link/3,
write/3, write/4
]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {
public_dir = "" :: string(),
private_dir = "" :: string(),
wedged = true :: boolean(),
wedge_notify_pid :: pid() | atom(),
max_public_epoch = -1 :: non_neg_integer(),
max_private_epoch = -1 :: non_neg_integer()
}).
start_link(RegName, DataDir, NotifyWedgeStateChanges) ->
gen_server:start_link({local, RegName},
?MODULE, [DataDir, NotifyWedgeStateChanges], []).
write(PidSpec, ProjType, Proj) ->
write(PidSpec, ProjType, Proj, infinity).
write(PidSpec, ProjType, Proj, Timeout)
when ProjType == 'public' orelse ProjType == 'private',
is_record(Proj, projection_v1) ->
g_call(PidSpec, {write, ProjType, Proj}, Timeout).
init([DataDir, NotifyWedgeStateChanges]) ->
lclock_init(),
PublicDir = machi_util:make_projection_filename(DataDir, "public"),
PrivateDir = machi_util:make_projection_filename(DataDir, "private"),
ok = filelib:ensure_dir(PublicDir ++ "/ignored"),
ok = filelib:ensure_dir(PrivateDir ++ "/ignored"),
MaxPublicEpoch = find_max_epoch(PublicDir),
MaxPrivateEpoch = find_max_epoch(PrivateDir),
{ok, #state{public_dir=PublicDir,
private_dir=PrivateDir,
wedged=true,
wedge_notify_pid=NotifyWedgeStateChanges,
max_public_epoch=MaxPublicEpoch,
max_private_epoch=MaxPrivateEpoch}}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
g_call(PidSpec, Arg, Timeout) ->
LC1 = lclock_get(),
{Res, LC2} = gen_server:call(PidSpec, {Arg, LC1}, Timeout),
lclock_update(LC2),
Res.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
handle_call({{write, ProjType, Proj}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
{Reply, NewS} = do_proj_write(ProjType, Proj, S),
{reply, {Reply, LC2}, NewS};
handle_call(_Request, _From, S) ->
Reply = whaaaaaaaaaaaaa,
{reply, Reply, S}.
handle_cast(_Msg, S) ->
{noreply, S}.
handle_info(_Info, S) ->
{noreply, S}.
terminate(_Reason, _S) ->
ok.
code_change(_OldVsn, S, _Extra) ->
{ok, S}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) ->
%% TODO: We probably ought to check the projection checksum for sanity, eh?
Dir = pick_path(ProjType, S),
Path = filename:join(Dir, epoch2name(Epoch)),
case file:read_file_info(Path) of
{ok, _FI} ->
{{error, written}, S};
{error, enoent} ->
{ok, FH} = file:open(Path, [write, raw, binary]),
ok = file:write(FH, term_to_binary(Proj)),
ok = file:sync(FH),
ok = file:close(FH),
{ok, S};
{error, Else} ->
{{error, Else}, S}
end.
pick_path(public, S) ->
S#state.public_dir;
pick_path(private, S) ->
S#state.private_dir.
epoch2name(Epoch) ->
machi_util:int_to_hexstr(Epoch, 32).
name2epoch(Name) ->
machi_util:hexstr_to_int(Name).
find_max_epoch(Dir) ->
Fs = lists:sort(filelib:wildcard("*", Dir)),
if Fs == [] ->
-1;
true ->
name2epoch(lists:last(Fs))
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
-ifdef(TEST).
lclock_init() ->
lamport_clock:init().
lclock_get() ->
lamport_clock:get().
lclock_update(LC) ->
lamport_clock:update(LC).
-else. % TEST
lclock_init() ->
ok.
lclock_get() ->
ok.
lclock_update(_LC) ->
ok.
-endif. % TEST

View file

@ -27,6 +27,7 @@
make_binary/1, make_string/1,
make_regname/1,
make_checksum_filename/2, make_data_filename/2,
make_projection_filename/2,
read_max_filenum/2, increment_max_filenum/2,
info_msg/2, verb/1, verb/2,
%% TCP protocol helpers
@ -60,19 +61,29 @@ make_checksum_filename(DataDir, Prefix, SequencerName, FileNum) ->
lists:flatten(io_lib:format("~s/config/~s.~s.~w.csum",
[DataDir, Prefix, SequencerName, FileNum])).
make_checksum_filename(DataDir, "") ->
lists:flatten(io_lib:format("~s/config", [DataDir]));
make_checksum_filename(DataDir, FileName) ->
lists:flatten(io_lib:format("~s/config/~s.csum", [DataDir, FileName])).
make_data_filename(DataDir, "") ->
FullPath = lists:flatten(io_lib:format("~s/data", [DataDir])),
{"", FullPath};
make_data_filename(DataDir, File) ->
FullPath = lists:flatten(io_lib:format("~s/~s", [DataDir, File])),
FullPath = lists:flatten(io_lib:format("~s/data/~s", [DataDir, File])),
{File, FullPath}.
make_data_filename(DataDir, Prefix, SequencerName, FileNum) ->
File = erlang:iolist_to_binary(io_lib:format("~s.~s.~w",
[Prefix, SequencerName, FileNum])),
FullPath = lists:flatten(io_lib:format("~s/~s", [DataDir, File])),
FullPath = lists:flatten(io_lib:format("~s/data/~s", [DataDir, File])),
{File, FullPath}.
make_projection_filename(DataDir, "") ->
lists:flatten(io_lib:format("~s/projection", [DataDir]));
make_projection_filename(DataDir, File) ->
lists:flatten(io_lib:format("~s/projection/~s", [DataDir, File])).
read_max_filenum(DataDir, Prefix) ->
case file:read_file_info(make_config_filename(DataDir, Prefix)) of
{error, enoent} ->

View file

@ -44,7 +44,7 @@ verify_file_checksums_test() ->
{ok, []} = machi_admin_util:verify_file_checksums_remote(
Host, TcpPort, ?DUMMY_PV1_EPOCH, File),
Path = DataDir ++ "/" ++ binary_to_list(File),
{_, Path} = machi_util:make_data_filename(DataDir,binary_to_list(File)),
{ok, FH} = file:open(Path, [read,write]),
{ok, _} = file:position(FH, ?MINIMUM_OFFSET),
ok = file:write(FH, "y"),

View file

@ -37,6 +37,10 @@ setup_test_flu(RegName, TcpPort, DataDir, DbgProps) ->
{ok, FLU1} = ?FLU:start_link([{RegName, TcpPort, DataDir},
{dbg, DbgProps}]),
%% TODO the process structuring/racy-ness of the various processes
%% of the FLU needs to be deterministic to remove this sleep race
%% "prevention".
timer:sleep(10),
FLU1.
flu_smoke_test() ->
@ -113,22 +117,18 @@ flu_smoke_test() ->
ok = ?FLU:stop(FLU1)
end.
flu_projection_test() ->
flu_projection_smoke_test() ->
Host = "localhost",
TcpPort = 32959,
DataDir = "./data",
Prefix = <<"prefix!">>,
BadPrefix = BadFile = "no/good",
FLU1 = setup_test_flu(projection_test_flu, TcpPort, DataDir),
try
{error, no_such_file} = ?FLU_C:checksum_list(Host, TcpPort,
?DUMMY_PV1_EPOCH,
"does-not-exist"),
P1 = machi_projection:new(1, a, [a], [], [a], [], []),
ok = ?FLU_C:write_projection(Host, TcpPort, public, P1),
{error, written} = ?FLU_C:write_projection(Host, TcpPort, public, P1),
ok = ?FLU_C:write_projection(Host, TcpPort, private, P1),
{error, written} = ?FLU_C:write_projection(Host, TcpPort, private, P1),
ok = ?FLU_C:quit(machi_util:connect(Host, TcpPort))
after
@ -136,12 +136,11 @@ flu_projection_test() ->
end.
clean_up_data_dir(DataDir) ->
Dir1 = DataDir ++ "/config",
Fs1 = filelib:wildcard(Dir1 ++ "/*"),
[file:delete(F) || F <- Fs1],
_ = file:del_dir(Dir1),
Fs2 = filelib:wildcard(DataDir ++ "/*"),
[file:delete(F) || F <- Fs2],
[begin
Fs = filelib:wildcard(DataDir ++ Glob),
[file:delete(F) || F <- Fs],
[file:del_dir(F) || F <- Fs]
end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ],
_ = file:del_dir(DataDir),
ok.

View file

@ -0,0 +1,154 @@
%% -------------------------------------------------------------------
%%
%% Machi: a small village of replicated files
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%%% File : handle_errors.erl
%%% Author : Ulf Norell
%%% Description :
%%% Created : 26 Mar 2012 by Ulf Norell
-module(event_logger).
-compile(export_all).
-behaviour(gen_server).
%% API
-export([start_link/0, event/1, event/2, get_events/0, start_logging/0]).
-export([timestamp/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, { start_time, events = [] }).
-record(event, { timestamp, data }).
%%====================================================================
%% API
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
start_logging() ->
gen_server:call(?MODULE, {start, timestamp()}).
event(EventData) ->
event(EventData, timestamp()).
event(EventData, Timestamp) ->
gen_server:call(?MODULE,
#event{ timestamp = Timestamp, data = EventData }).
async_event(EventData) ->
gen_server:cast(?MODULE,
#event{ timestamp = timestamp(), data = EventData }).
get_events() ->
gen_server:call(?MODULE, get_events).
%%====================================================================
%% gen_server callbacks
%%====================================================================
%%--------------------------------------------------------------------
%% Function: init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([]) ->
{ok, #state{}}.
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) ->
%% {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
handle_call(Event = #event{}, _From, State) ->
{reply, ok, add_event(Event, State)};
handle_call({start, Now}, _From, S) ->
{reply, ok, S#state{ events = [], start_time = Now }};
handle_call(get_events, _From, S) ->
{reply, lists:reverse([ {E#event.timestamp, E#event.data} || E <- S#state.events]),
S#state{ events = [] }};
handle_call(Request, _From, State) ->
{reply, {error, {bad_call, Request}}, State}.
%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
handle_cast(Event = #event{}, State) ->
{noreply, add_event(Event, State)};
handle_cast(_Msg, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: terminate(Reason, State) -> void()
%% Description: This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
%% Description: Convert process state when code is changed
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
add_event(#event{timestamp = Now, data = Data}, State) ->
Event = #event{ timestamp = Now, data = Data },
State#state{ events = [Event|State#state.events] }.
timestamp() ->
lamport_clock:get().

View file

@ -0,0 +1,174 @@
%% -------------------------------------------------------------------
%%
%% Machi: a small village of replicated files
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%%%-------------------------------------------------------------------
%%% @author Hans Svensson <>
%%% @copyright (C) 2012, Hans Svensson
%%% @doc
%%%
%%% @end
%%% Created : 19 Mar 2012 by Hans Svensson <>
%%%-------------------------------------------------------------------
-module(handle_errors).
-behaviour(gen_event).
%% API
-export([start_link/0, add_handler/0]).
%% gen_event callbacks
-export([init/1, handle_event/2, handle_call/2,
handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, { errors = [] }).
%%%===================================================================
%%% gen_event callbacks
%%%===================================================================
%%--------------------------------------------------------------------
%% @doc
%% Creates an event manager
%%
%% @spec start_link() -> {ok, Pid} | {error, Error}
%% @end
%%--------------------------------------------------------------------
start_link() ->
gen_event:start_link({local, ?SERVER}).
%%--------------------------------------------------------------------
%% @doc
%% Adds an event handler
%%
%% @spec add_handler() -> ok | {'EXIT', Reason} | term()
%% @end
%%--------------------------------------------------------------------
add_handler() ->
gen_event:add_handler(?SERVER, ?MODULE, []).
%%%===================================================================
%%% gen_event callbacks
%%%===================================================================
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever a new event handler is added to an event manager,
%% this function is called to initialize the event handler.
%%
%% @spec init(Args) -> {ok, State}
%% @end
%%--------------------------------------------------------------------
init([]) ->
{ok, #state{}}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event manager receives an event sent using
%% gen_event:notify/2 or gen_event:sync_notify/2, this function is
%% called for each installed event handler to handle the event.
%%
%% @spec handle_event(Event, State) ->
%% {ok, State} |
%% {swap_handler, Args1, State1, Mod2, Args2} |
%% remove_handler
%% @end
%%--------------------------------------------------------------------
handle_event({error, _, {_, "Hintfile '~s' has bad CRC" ++ _, _}}, State) ->
{ok, State};
handle_event({error, _, {_, "** Generic server" ++ _, _}}, State) ->
{ok, State};
handle_event({error, _, {_, "Failed to merge ~p: ~p\n", [_, not_ready]}}, State) ->
{ok, State};
handle_event({error, _, {_, "Failed to merge ~p: ~p\n", [_, {merge_locked, _, _}]}}, State) ->
{ok, State};
handle_event({error, _, {_, "Failed to read lock data from ~s: ~p\n", [_, {invalid_data, <<>>}]}}, State) ->
{ok, State};
handle_event({error, _, Event}, State) ->
{ok, State#state{ errors = [Event|State#state.errors] }};
handle_event(_Event, State) ->
{ok, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event manager receives a request sent using
%% gen_event:call/3,4, this function is called for the specified
%% event handler to handle the request.
%%
%% @spec handle_call(Request, State) ->
%% {ok, Reply, State} |
%% {swap_handler, Reply, Args1, State1, Mod2, Args2} |
%% {remove_handler, Reply}
%% @end
%%--------------------------------------------------------------------
handle_call(get_errors, S) ->
{ok, S#state.errors, S#state{ errors = [] }};
handle_call(_Request, State) ->
Reply = ok,
{ok, Reply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called for each installed event handler when
%% an event manager receives any other message than an event or a
%% synchronous request (or a system message).
%%
%% @spec handle_info(Info, State) ->
%% {ok, State} |
%% {swap_handler, Args1, State1, Mod2, Args2} |
%% remove_handler
%% @end
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
{ok, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event handler is deleted from an event manager, this
%% function is called. It should be the opposite of Module:init/1 and
%% do any necessary cleaning up.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%%
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View file

@ -0,0 +1,73 @@
%% -------------------------------------------------------------------
%%
%% Machi: a small village of replicated files
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(lamport_clock).
-export([init/0, reset/0, get/0, update/1, incr/0]).
-define(KEY, ?MODULE).
-ifdef(TEST).
init() ->
case get(?KEY) of
undefined ->
reset();
N when is_integer(N) ->
ok
end.
reset() ->
FakeTOD = 0,
put(?KEY, FakeTOD + 1).
get() ->
init(),
get(?KEY).
update(Remote) ->
New = erlang:max(get(?KEY), Remote) + 1,
put(?KEY, New),
New.
incr() ->
New = get(?KEY) + 1,
put(?KEY, New),
New.
-else. % TEST
init() ->
ok.
reset() ->
ok.
get() ->
ok.
update(_) ->
ok.
incr() ->
ok.
-endif. % TEST