diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 78fc3b9..8bd710d 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -23,11 +23,14 @@ -include_lib("kernel/include/file.hrl"). -include("machi.hrl"). +-include("machi_projection.hrl"). -export([start_link/1, stop/1]). -record(state, { reg_name :: atom(), + proj_store :: pid(), + append_pid :: pid(), tcp_port :: non_neg_integer(), data_dir :: string(), wedge = true :: 'disabled' | boolean(), @@ -52,10 +55,16 @@ stop(Pid) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%% main2(RegName, TcpPort, DataDir, Rest) -> - S1 = #state{reg_name=RegName, + S0 = #state{reg_name=RegName, tcp_port=TcpPort, data_dir=DataDir, props=Rest}, + AppendPid = start_append_server(S0), + ProjRegName = make_projection_server_regname(RegName), + {ok, ProjectionPid} = + machi_projection_store:start_link(ProjRegName, DataDir, AppendPid), + S1 = S0#state{append_pid=AppendPid, + proj_store=ProjectionPid}, S2 = case proplists:get_value(dbg, Rest) of undefined -> S1; @@ -64,10 +73,18 @@ main2(RegName, TcpPort, DataDir, Rest) -> dbg_props=DbgProps, props=lists:keydelete(dbg, 1, Rest)} end, - AppendPid = start_append_server(S2), ListenPid = start_listen_server(S2), + + Config_e = machi_util:make_config_filename(DataDir, "unused"), + ok = filelib:ensure_dir(Config_e), + {_, Data_e} = machi_util:make_data_filename(DataDir, "unused"), + ok = filelib:ensure_dir(Data_e), + Projection_e = machi_util:make_projection_filename(DataDir, "unused"), + ok = filelib:ensure_dir(Projection_e), + put(flu_reg_name, RegName), put(flu_append_pid, AppendPid), + put(flu_projection_pid, ProjectionPid), put(flu_listen_pid, ListenPid), receive forever -> ok end. @@ -77,6 +94,9 @@ start_listen_server(S) -> start_append_server(S) -> spawn_link(fun() -> run_append_server(S) end). +%% start_projection_server(S) -> +%% spawn_link(fun() -> run_projection_server(S) end). + run_listen_server(#state{tcp_port=TcpPort}=S) -> SockOpts = [{reuseaddr, true}, {mode, binary}, {active, false}, {packet, line}], @@ -97,7 +117,9 @@ append_server_loop(#state{data_dir=DataDir}=S) -> {seq_append, From, Prefix, Chunk, CSum} -> spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum, DataDir) end), - append_server_loop(S) + append_server_loop(S); + {wedge_state_change, Boolean} -> + append_server_loop(S#state{wedge=Boolean}) end. -define(EpochIDSpace, (4+20)). @@ -152,6 +174,8 @@ net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) -> _EpochIDRaw:(?EpochIDSpace)/binary, File:DelFileLenLF/binary, "\n">> -> do_net_server_truncate_hackityhack(Sock, File, DataDir); + <<"PROJ ", LenHex:8/binary, "\n">> -> + do_projection_command(Sock, LenHex, S); _ -> machi_util:verb("Else Got: ~p\n", [Line]), gen_tcp:send(Sock, "ERROR SYNTAX\n"), @@ -249,7 +273,6 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, file:close(FH) end; {error, enoent} when OptsHasWrite -> - ok = filelib:ensure_dir(Path), do_net_server_readwrite_common( Sock, OffsetHex, LenHex, FileBin, DataDir, FileOpts, DoItFun); @@ -315,10 +338,11 @@ decode_and_reply_net_server_ec_read_version_a(Sock, Rest) -> ok = gen_tcp:send(Sock, ["ERASURE ", BodyLenHex, " ", Hdr, Body]). do_net_server_listing(Sock, DataDir) -> - Files = filelib:wildcard("*", DataDir) -- ["config"], + {_, WildPath} = machi_util:make_data_filename(DataDir, ""), + Files = filelib:wildcard("*", WildPath), Out = ["OK\n", [begin - {ok, FI} = file:read_file_info(DataDir ++ "/" ++ File), + {ok, FI} = file:read_file_info(WildPath ++ "/" ++ File), Size = FI#file_info.size, SizeBin = <>, [machi_util:bin_to_hexstr(SizeBin), <<" ">>, @@ -453,8 +477,6 @@ start_seq_append_server(Prefix, DataDir) -> run_seq_append_server(Prefix, DataDir) -> true = register(machi_util:make_regname(Prefix), self()), - ok = filelib:ensure_dir(DataDir ++ "/unused"), - ok = filelib:ensure_dir(DataDir ++ "/config/unused"), run_seq_append_server2(Prefix, DataDir). run_seq_append_server2(Prefix, DataDir) -> @@ -521,3 +543,36 @@ seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) -> exit(normal) end. +do_projection_command(Sock, LenHex, S) -> + try + Len = machi_util:hexstr_to_int(LenHex), + ok = inet:setopts(Sock, [{packet, raw}]), + {ok, ProjCmdBin} = gen_tcp:recv(Sock, Len), + ok = inet:setopts(Sock, [{packet, line}]), + ProjCmd = binary_to_term(ProjCmdBin), + case handle_projection_command(ProjCmd, S) of + ok -> + ok = gen_tcp:send(Sock, <<"OK\n">>); + {error, written} -> + ok = gen_tcp:send(Sock, <<"ERROR WRITTEN\n">>); + {error, not_written} -> + ok = gen_tcp:send(Sock, <<"ERROR NOT-WRITTEN\n">>); + Else -> + TODO = list_to_binary(io_lib:format("TODO-YOLO-~w", [Else])), + ok = gen_tcp:send(Sock, [<<"ERROR ">>, TODO, <<"\n">>]) + end + catch + What:Why -> + WHA = list_to_binary(io_lib:format("TODO-YOLO.~w:~w-~w", + [What, Why, erlang:get_stacktrace()])), + _ = (catch gen_tcp:send(Sock, [<<"ERROR ">>, WHA, <<"\n">>])) + end. + +handle_projection_command({write_projection, ProjType, Proj}, + #state{proj_store=ProjStore}) -> + machi_projection_store:write(ProjStore, ProjType, Proj); +handle_projection_command(Else, _S) -> + {error, unknown_cmd, Else}. + +make_projection_server_regname(BaseName) -> + list_to_atom(atom_to_list(BaseName) ++ "_projection"). diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index edbf90f..1965d8a 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -150,7 +150,9 @@ list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) -> -spec write_projection(port(), projection_type(), projection()) -> 'ok' | {error, written} | {error, term()}. -write_projection(Sock, ProjType, Proj) -> +write_projection(Sock, ProjType, Proj) + when ProjType == 'public' orelse ProjType == 'private', + is_record(Proj, projection_v1) -> write_projection2(Sock, ProjType, Proj). %% @doc Write a projection `Proj' of type `ProjType'. @@ -158,7 +160,9 @@ write_projection(Sock, ProjType, Proj) -> -spec write_projection(inet_host(), inet_port(), projection_type(), projection()) -> 'ok' | {error, written} | {error, term()}. -write_projection(Host, TcpPort, ProjType, Proj) -> +write_projection(Host, TcpPort, ProjType, Proj) + when ProjType == 'public' orelse ProjType == 'private', + is_record(Proj, projection_v1) -> Sock = machi_util:connect(Host, TcpPort), try write_projection2(Sock, ProjType, Proj) diff --git a/src/machi_projection_store.erl b/src/machi_projection_store.erl new file mode 100644 index 0000000..526113b --- /dev/null +++ b/src/machi_projection_store.erl @@ -0,0 +1,164 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(machi_projection_store). + +-include("machi_projection.hrl"). + +%% API +-export([ + start_link/3, + write/3, write/4 + ]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, { + public_dir = "" :: string(), + private_dir = "" :: string(), + wedged = true :: boolean(), + wedge_notify_pid :: pid() | atom(), + max_public_epoch = -1 :: non_neg_integer(), + max_private_epoch = -1 :: non_neg_integer() + }). + +start_link(RegName, DataDir, NotifyWedgeStateChanges) -> + gen_server:start_link({local, RegName}, + ?MODULE, [DataDir, NotifyWedgeStateChanges], []). + +write(PidSpec, ProjType, Proj) -> + write(PidSpec, ProjType, Proj, infinity). + +write(PidSpec, ProjType, Proj, Timeout) + when ProjType == 'public' orelse ProjType == 'private', + is_record(Proj, projection_v1) -> + g_call(PidSpec, {write, ProjType, Proj}, Timeout). + +init([DataDir, NotifyWedgeStateChanges]) -> + lclock_init(), + PublicDir = machi_util:make_projection_filename(DataDir, "public"), + PrivateDir = machi_util:make_projection_filename(DataDir, "private"), + ok = filelib:ensure_dir(PublicDir ++ "/ignored"), + ok = filelib:ensure_dir(PrivateDir ++ "/ignored"), + MaxPublicEpoch = find_max_epoch(PublicDir), + MaxPrivateEpoch = find_max_epoch(PrivateDir), + + {ok, #state{public_dir=PublicDir, + private_dir=PrivateDir, + wedged=true, + wedge_notify_pid=NotifyWedgeStateChanges, + max_public_epoch=MaxPublicEpoch, + max_private_epoch=MaxPrivateEpoch}}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%% + +g_call(PidSpec, Arg, Timeout) -> + LC1 = lclock_get(), + {Res, LC2} = gen_server:call(PidSpec, {Arg, LC1}, Timeout), + lclock_update(LC2), + Res. + +%%%%%%%%%%%%%%%%%%%%%%%%%%% + +handle_call({{write, ProjType, Proj}, LC1}, _From, S) -> + LC2 = lclock_update(LC1), + {Reply, NewS} = do_proj_write(ProjType, Proj, S), + {reply, {Reply, LC2}, NewS}; +handle_call(_Request, _From, S) -> + Reply = whaaaaaaaaaaaaa, + {reply, Reply, S}. + +handle_cast(_Msg, S) -> + {noreply, S}. + +handle_info(_Info, S) -> + {noreply, S}. + +terminate(_Reason, _S) -> + ok. + +code_change(_OldVsn, S, _Extra) -> + {ok, S}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%% + +do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) -> + %% TODO: We probably ought to check the projection checksum for sanity, eh? + Dir = pick_path(ProjType, S), + Path = filename:join(Dir, epoch2name(Epoch)), + case file:read_file_info(Path) of + {ok, _FI} -> + {{error, written}, S}; + {error, enoent} -> + {ok, FH} = file:open(Path, [write, raw, binary]), + ok = file:write(FH, term_to_binary(Proj)), + ok = file:sync(FH), + ok = file:close(FH), + {ok, S}; + {error, Else} -> + {{error, Else}, S} + end. + +pick_path(public, S) -> + S#state.public_dir; +pick_path(private, S) -> + S#state.private_dir. + +epoch2name(Epoch) -> + machi_util:int_to_hexstr(Epoch, 32). + +name2epoch(Name) -> + machi_util:hexstr_to_int(Name). + +find_max_epoch(Dir) -> + Fs = lists:sort(filelib:wildcard("*", Dir)), + if Fs == [] -> + -1; + true -> + name2epoch(lists:last(Fs)) + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-ifdef(TEST). + +lclock_init() -> + lamport_clock:init(). + +lclock_get() -> + lamport_clock:get(). + +lclock_update(LC) -> + lamport_clock:update(LC). + +-else. % TEST + +lclock_init() -> + ok. + +lclock_get() -> + ok. + +lclock_update(_LC) -> + ok. + +-endif. % TEST diff --git a/src/machi_util.erl b/src/machi_util.erl index f526803..1331d11 100644 --- a/src/machi_util.erl +++ b/src/machi_util.erl @@ -27,6 +27,7 @@ make_binary/1, make_string/1, make_regname/1, make_checksum_filename/2, make_data_filename/2, + make_projection_filename/2, read_max_filenum/2, increment_max_filenum/2, info_msg/2, verb/1, verb/2, %% TCP protocol helpers @@ -60,19 +61,29 @@ make_checksum_filename(DataDir, Prefix, SequencerName, FileNum) -> lists:flatten(io_lib:format("~s/config/~s.~s.~w.csum", [DataDir, Prefix, SequencerName, FileNum])). +make_checksum_filename(DataDir, "") -> + lists:flatten(io_lib:format("~s/config", [DataDir])); make_checksum_filename(DataDir, FileName) -> lists:flatten(io_lib:format("~s/config/~s.csum", [DataDir, FileName])). +make_data_filename(DataDir, "") -> + FullPath = lists:flatten(io_lib:format("~s/data", [DataDir])), + {"", FullPath}; make_data_filename(DataDir, File) -> - FullPath = lists:flatten(io_lib:format("~s/~s", [DataDir, File])), + FullPath = lists:flatten(io_lib:format("~s/data/~s", [DataDir, File])), {File, FullPath}. make_data_filename(DataDir, Prefix, SequencerName, FileNum) -> File = erlang:iolist_to_binary(io_lib:format("~s.~s.~w", [Prefix, SequencerName, FileNum])), - FullPath = lists:flatten(io_lib:format("~s/~s", [DataDir, File])), + FullPath = lists:flatten(io_lib:format("~s/data/~s", [DataDir, File])), {File, FullPath}. +make_projection_filename(DataDir, "") -> + lists:flatten(io_lib:format("~s/projection", [DataDir])); +make_projection_filename(DataDir, File) -> + lists:flatten(io_lib:format("~s/projection/~s", [DataDir, File])). + read_max_filenum(DataDir, Prefix) -> case file:read_file_info(make_config_filename(DataDir, Prefix)) of {error, enoent} -> diff --git a/test/machi_admin_util_test.erl b/test/machi_admin_util_test.erl index 0a44a15..8555959 100644 --- a/test/machi_admin_util_test.erl +++ b/test/machi_admin_util_test.erl @@ -44,7 +44,7 @@ verify_file_checksums_test() -> {ok, []} = machi_admin_util:verify_file_checksums_remote( Host, TcpPort, ?DUMMY_PV1_EPOCH, File), - Path = DataDir ++ "/" ++ binary_to_list(File), + {_, Path} = machi_util:make_data_filename(DataDir,binary_to_list(File)), {ok, FH} = file:open(Path, [read,write]), {ok, _} = file:position(FH, ?MINIMUM_OFFSET), ok = file:write(FH, "y"), diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index 0334d73..3a36800 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -37,6 +37,10 @@ setup_test_flu(RegName, TcpPort, DataDir, DbgProps) -> {ok, FLU1} = ?FLU:start_link([{RegName, TcpPort, DataDir}, {dbg, DbgProps}]), + %% TODO the process structuring/racy-ness of the various processes + %% of the FLU needs to be deterministic to remove this sleep race + %% "prevention". + timer:sleep(10), FLU1. flu_smoke_test() -> @@ -113,22 +117,18 @@ flu_smoke_test() -> ok = ?FLU:stop(FLU1) end. -flu_projection_test() -> +flu_projection_smoke_test() -> Host = "localhost", TcpPort = 32959, DataDir = "./data", - Prefix = <<"prefix!">>, - BadPrefix = BadFile = "no/good", FLU1 = setup_test_flu(projection_test_flu, TcpPort, DataDir), try - {error, no_such_file} = ?FLU_C:checksum_list(Host, TcpPort, - ?DUMMY_PV1_EPOCH, - "does-not-exist"), - P1 = machi_projection:new(1, a, [a], [], [a], [], []), ok = ?FLU_C:write_projection(Host, TcpPort, public, P1), {error, written} = ?FLU_C:write_projection(Host, TcpPort, public, P1), + ok = ?FLU_C:write_projection(Host, TcpPort, private, P1), + {error, written} = ?FLU_C:write_projection(Host, TcpPort, private, P1), ok = ?FLU_C:quit(machi_util:connect(Host, TcpPort)) after @@ -136,12 +136,11 @@ flu_projection_test() -> end. clean_up_data_dir(DataDir) -> - Dir1 = DataDir ++ "/config", - Fs1 = filelib:wildcard(Dir1 ++ "/*"), - [file:delete(F) || F <- Fs1], - _ = file:del_dir(Dir1), - Fs2 = filelib:wildcard(DataDir ++ "/*"), - [file:delete(F) || F <- Fs2], + [begin + Fs = filelib:wildcard(DataDir ++ Glob), + [file:delete(F) || F <- Fs], + [file:del_dir(F) || F <- Fs] + end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ], _ = file:del_dir(DataDir), ok. diff --git a/test/pulse_util/event_logger.erl b/test/pulse_util/event_logger.erl new file mode 100644 index 0000000..f6a39d0 --- /dev/null +++ b/test/pulse_util/event_logger.erl @@ -0,0 +1,154 @@ +%% ------------------------------------------------------------------- +%% +%% Machi: a small village of replicated files +%% +%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +%%% File : handle_errors.erl +%%% Author : Ulf Norell +%%% Description : +%%% Created : 26 Mar 2012 by Ulf Norell +-module(event_logger). + +-compile(export_all). + +-behaviour(gen_server). + +%% API +-export([start_link/0, event/1, event/2, get_events/0, start_logging/0]). +-export([timestamp/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { start_time, events = [] }). + +-record(event, { timestamp, data }). + + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} +%% Description: Starts the server +%%-------------------------------------------------------------------- +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +start_logging() -> + gen_server:call(?MODULE, {start, timestamp()}). + +event(EventData) -> + event(EventData, timestamp()). + +event(EventData, Timestamp) -> + gen_server:call(?MODULE, + #event{ timestamp = Timestamp, data = EventData }). + +async_event(EventData) -> + gen_server:cast(?MODULE, + #event{ timestamp = timestamp(), data = EventData }). + +get_events() -> + gen_server:call(?MODULE, get_events). + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% Function: init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%-------------------------------------------------------------------- +init([]) -> + {ok, #state{}}. + +%%-------------------------------------------------------------------- +%% Function: %% handle_call(Request, From, State) -> +%% {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% Description: Handling call messages +%%-------------------------------------------------------------------- +handle_call(Event = #event{}, _From, State) -> + {reply, ok, add_event(Event, State)}; +handle_call({start, Now}, _From, S) -> + {reply, ok, S#state{ events = [], start_time = Now }}; +handle_call(get_events, _From, S) -> + {reply, lists:reverse([ {E#event.timestamp, E#event.data} || E <- S#state.events]), + S#state{ events = [] }}; +handle_call(Request, _From, State) -> + {reply, {error, {bad_call, Request}}, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling cast messages +%%-------------------------------------------------------------------- +handle_cast(Event = #event{}, State) -> + {noreply, add_event(Event, State)}; +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- + +add_event(#event{timestamp = Now, data = Data}, State) -> + Event = #event{ timestamp = Now, data = Data }, + State#state{ events = [Event|State#state.events] }. + +timestamp() -> + lamport_clock:get(). diff --git a/test/pulse_util/handle_errors.erl b/test/pulse_util/handle_errors.erl new file mode 100644 index 0000000..97965b8 --- /dev/null +++ b/test/pulse_util/handle_errors.erl @@ -0,0 +1,174 @@ +%% ------------------------------------------------------------------- +%% +%% Machi: a small village of replicated files +%% +%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +%%%------------------------------------------------------------------- +%%% @author Hans Svensson <> +%%% @copyright (C) 2012, Hans Svensson +%%% @doc +%%% +%%% @end +%%% Created : 19 Mar 2012 by Hans Svensson <> +%%%------------------------------------------------------------------- +-module(handle_errors). + +-behaviour(gen_event). + +%% API +-export([start_link/0, add_handler/0]). + +%% gen_event callbacks +-export([init/1, handle_event/2, handle_call/2, + handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { errors = [] }). + +%%%=================================================================== +%%% gen_event callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @doc +%% Creates an event manager +%% +%% @spec start_link() -> {ok, Pid} | {error, Error} +%% @end +%%-------------------------------------------------------------------- +start_link() -> + gen_event:start_link({local, ?SERVER}). + +%%-------------------------------------------------------------------- +%% @doc +%% Adds an event handler +%% +%% @spec add_handler() -> ok | {'EXIT', Reason} | term() +%% @end +%%-------------------------------------------------------------------- +add_handler() -> + gen_event:add_handler(?SERVER, ?MODULE, []). + +%%%=================================================================== +%%% gen_event callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever a new event handler is added to an event manager, +%% this function is called to initialize the event handler. +%% +%% @spec init(Args) -> {ok, State} +%% @end +%%-------------------------------------------------------------------- +init([]) -> + {ok, #state{}}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever an event manager receives an event sent using +%% gen_event:notify/2 or gen_event:sync_notify/2, this function is +%% called for each installed event handler to handle the event. +%% +%% @spec handle_event(Event, State) -> +%% {ok, State} | +%% {swap_handler, Args1, State1, Mod2, Args2} | +%% remove_handler +%% @end +%%-------------------------------------------------------------------- +handle_event({error, _, {_, "Hintfile '~s' has bad CRC" ++ _, _}}, State) -> + {ok, State}; +handle_event({error, _, {_, "** Generic server" ++ _, _}}, State) -> + {ok, State}; +handle_event({error, _, {_, "Failed to merge ~p: ~p\n", [_, not_ready]}}, State) -> + {ok, State}; +handle_event({error, _, {_, "Failed to merge ~p: ~p\n", [_, {merge_locked, _, _}]}}, State) -> + {ok, State}; +handle_event({error, _, {_, "Failed to read lock data from ~s: ~p\n", [_, {invalid_data, <<>>}]}}, State) -> + {ok, State}; +handle_event({error, _, Event}, State) -> + {ok, State#state{ errors = [Event|State#state.errors] }}; +handle_event(_Event, State) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever an event manager receives a request sent using +%% gen_event:call/3,4, this function is called for the specified +%% event handler to handle the request. +%% +%% @spec handle_call(Request, State) -> +%% {ok, Reply, State} | +%% {swap_handler, Reply, Args1, State1, Mod2, Args2} | +%% {remove_handler, Reply} +%% @end +%%-------------------------------------------------------------------- +handle_call(get_errors, S) -> + {ok, S#state.errors, S#state{ errors = [] }}; +handle_call(_Request, State) -> + Reply = ok, + {ok, Reply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called for each installed event handler when +%% an event manager receives any other message than an event or a +%% synchronous request (or a system message). +%% +%% @spec handle_info(Info, State) -> +%% {ok, State} | +%% {swap_handler, Args1, State1, Mod2, Args2} | +%% remove_handler +%% @end +%%-------------------------------------------------------------------- +handle_info(_Info, State) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever an event handler is deleted from an event manager, this +%% function is called. It should be the opposite of Module:init/1 and +%% do any necessary cleaning up. +%% +%% @spec terminate(Reason, State) -> void() +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/test/pulse_util/lamport_clock.erl b/test/pulse_util/lamport_clock.erl new file mode 100644 index 0000000..0bb8e3d --- /dev/null +++ b/test/pulse_util/lamport_clock.erl @@ -0,0 +1,73 @@ +%% ------------------------------------------------------------------- +%% +%% Machi: a small village of replicated files +%% +%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(lamport_clock). + +-export([init/0, reset/0, get/0, update/1, incr/0]). + +-define(KEY, ?MODULE). + +-ifdef(TEST). + +init() -> + case get(?KEY) of + undefined -> + reset(); + N when is_integer(N) -> + ok + end. + +reset() -> + FakeTOD = 0, + put(?KEY, FakeTOD + 1). + +get() -> + init(), + get(?KEY). + +update(Remote) -> + New = erlang:max(get(?KEY), Remote) + 1, + put(?KEY, New), + New. + +incr() -> + New = get(?KEY) + 1, + put(?KEY, New), + New. + +-else. % TEST + +init() -> + ok. + +reset() -> + ok. + +get() -> + ok. + +update(_) -> + ok. + +incr() -> + ok. + +-endif. % TEST