Deliver WiredTiger event notifications (messages, errors, progress) into Erlang for logging #7

Merged
gburd merged 9 commits from gsb-event-handlers into master 2013-04-23 19:02:22 +00:00
5 changed files with 107 additions and 53 deletions
Showing only changes of commit fc59ae3f90 - Show all commits

View file

@ -100,9 +100,14 @@ static ERL_NIF_TERM ATOM_WIREDTIGER_VSN;
static ERL_NIF_TERM ATOM_MSG_PID;
struct wterl_event_handlers {
WT_EVENT_HANDLER handlers;
ErlNifEnv *msg_env;
ErlNifPid to_pid;
WT_EVENT_HANDLER handlers;
ErlNifEnv *msg_env_error;
ErlNifMutex *error_mutex;
ErlNifEnv *msg_env_message;
ErlNifMutex *message_mutex;
ErlNifEnv *msg_env_progress;
ErlNifMutex *progress_mutex;
ErlNifPid to_pid;
};
/* Generators for 'conns' a named, type-specific hash table functions. */
@ -138,9 +143,14 @@ int
__wterl_error_handler(WT_EVENT_HANDLER *handler, int error, const char *message)
{
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
ErlNifEnv *msg_env = eh->msg_env;
ErlNifPid *to_pid = &eh->to_pid;
if (msg_env && to_pid) {
ErlNifEnv *msg_env;
ErlNifPid *to_pid;
int rc = 0;
enif_mutex_lock(eh->error_mutex);
msg_env = eh->msg_env_error;
to_pid = &eh->to_pid;
if (msg_env) {
ERL_NIF_TERM msg =
enif_make_tuple2(msg_env, ATOM_ERROR,
enif_make_tuple2(msg_env,
@ -149,11 +159,11 @@ __wterl_error_handler(WT_EVENT_HANDLER *handler, int error, const char *message)
enif_clear_env(msg_env);
if (!enif_send(NULL, to_pid, msg_env, msg))
fprintf(stderr, "[%d] %s\n", error, message);
return 0;
} else {
// output to stderr
return (fprintf(stderr, "[%d] %s\n", error, message) >= 0 ? 0 : EIO);
rc = (fprintf(stderr, "[%d] %s\n", error, message) >= 0 ? 0 : EIO);
}
enif_mutex_unlock(eh->error_mutex);
return rc;
}
/**
@ -171,20 +181,25 @@ int
__wterl_message_handler(WT_EVENT_HANDLER *handler, const char *message)
{
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
ErlNifEnv *msg_env = eh->msg_env;
ErlNifPid *to_pid = &eh->to_pid;
if (msg_env && to_pid) {
ErlNifEnv *msg_env;
ErlNifPid *to_pid;
int rc = 0;
enif_mutex_lock(eh->message_mutex);
msg_env = eh->msg_env_message;
to_pid = &eh->to_pid;
if (msg_env) {
ERL_NIF_TERM msg =
enif_make_tuple2(msg_env, ATOM_MESSAGE,
enif_make_string(msg_env, message, ERL_NIF_LATIN1));
enif_clear_env(msg_env);
if (!enif_send(NULL, to_pid, msg_env, msg))
fprintf(stderr, "%s\n", message);
return 0;
} else {
// output to stdout
return (printf("%s\n", message) >= 0 ? 0 : EIO);
rc = (printf("%s\n", message) >= 0 ? 0 : EIO);
}
enif_mutex_unlock(eh->message_mutex);
return rc;
}
/**
@ -203,9 +218,14 @@ int
__wterl_progress_handler(WT_EVENT_HANDLER *handler, const char *operation, uint64_t counter)
{
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
ErlNifEnv *msg_env = eh->msg_env;
ErlNifPid *to_pid = &eh->to_pid;
if (msg_env && to_pid) {
ErlNifEnv *msg_env;
ErlNifPid *to_pid;
int rc = 0;
enif_mutex_lock(eh->progress_mutex);
msg_env = eh->msg_env_progress;
to_pid = &eh->to_pid;
if (msg_env) {
ERL_NIF_TERM msg =
enif_make_tuple2(msg_env, ATOM_PROGRESS,
enif_make_tuple2(msg_env,
@ -214,11 +234,11 @@ __wterl_progress_handler(WT_EVENT_HANDLER *handler, const char *operation, uint6
enif_clear_env(msg_env);
if (!enif_send(NULL, to_pid, msg_env, msg))
fprintf(stderr, "[%ld] %s\n", counter, operation);
return 0;
} else {
// output to stdout
return (printf("[%ld] %s\n", counter, operation) >= 0 ? 0 : EIO);
rc = (printf("[%ld] %s\n", counter, operation) >= 0 ? 0 : EIO);
}
enif_mutex_unlock(eh->progress_mutex);
return rc;
}
/**
@ -2003,6 +2023,37 @@ ASYNC_NIF_DECL(
enif_release_resource((void*)args->cursor_handle);
});
/**
* Called by wterl_event_handler to set the pid for message delivery.
*/
static ERL_NIF_TERM
wterl_set_event_handler_pid(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
struct wterl_priv_data *priv = enif_priv_data(env);
struct wterl_event_handlers *eh = &priv->eh;
if (!(argc == 1 && enif_is_pid(env, argv[0]))) {
return enif_make_badarg(env);
}
if (enif_get_local_pid(env, argv[0], &eh->to_pid)) {
if (!eh->msg_env_message)
eh->msg_env_message = enif_alloc_env(); // TOOD: if (!eh->msg_env) { return ENOMEM; }
if (!eh->msg_env_error)
eh->msg_env_error = enif_alloc_env();
if (!eh->msg_env_progress)
eh->msg_env_progress = enif_alloc_env();
eh->handlers.handle_error = __wterl_error_handler;
eh->handlers.handle_message = __wterl_message_handler;
eh->handlers.handle_progress = __wterl_progress_handler;
} else {
memset(&eh->to_pid, 0, sizeof(ErlNifPid));
}
return ATOM_OK;
}
/**
* Called as this driver is loaded by the Erlang BEAM runtime triggered by the
* module's on_load directive.
@ -2051,22 +2102,13 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
}
struct wterl_event_handlers *eh = &priv->eh;
eh->msg_env = enif_alloc_env();
if (!eh->msg_env) {
kh_destroy(conns, priv->conns);
enif_mutex_destroy(priv->conns_mutex);
enif_free(priv);
return ENOMEM;
}
eh->handlers.handle_error = __wterl_error_handler;
eh->handlers.handle_message = __wterl_message_handler;
eh->handlers.handle_progress = __wterl_progress_handler;
eh->error_mutex = enif_mutex_create(NULL);
eh->message_mutex = enif_mutex_create(NULL);
eh->progress_mutex = enif_mutex_create(NULL);
/* Process the load_info array of tuples, we expect:
[{wterl_vsn, "a version string"},
{wiredtiger_vsn, "a version string"},
{message_pid, ERL_NIF_TERM pid}].
*/
{wiredtiger_vsn, "a version string"}]. */
while (enif_get_list_cell(env, load_info, &head, &tail)) {
if (enif_get_tuple(env, head, &arity, &option)) {
if (arity == 2) {
@ -2074,9 +2116,6 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
enif_get_string(env, option[1], priv->wterl_vsn, sizeof(priv->wterl_vsn), ERL_NIF_LATIN1);
} else if (enif_is_identical(option[0], ATOM_WIREDTIGER_VSN)) {
enif_get_string(env, option[1], priv->wiredtiger_vsn, sizeof(priv->wiredtiger_vsn), ERL_NIF_LATIN1);
} else if (enif_is_identical(option[0], ATOM_MSG_PID)) {
if (!enif_get_local_pid(env, option[1], &priv->eh.to_pid))
return -1;
}
}
}
@ -2087,7 +2126,6 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
pointer to the async_nif's private data which we set here. */
ASYNC_NIF_LOAD(wterl, priv->async_nif_priv);
if (!priv->async_nif_priv) {
enif_free_env(eh->msg_env);
kh_destroy(conns, priv->conns);
enif_mutex_destroy(priv->conns_mutex);
enif_free(priv);
@ -2173,7 +2211,17 @@ on_unload(ErlNifEnv *env, void *priv_data)
/* At this point all WiredTiger state and threads are free'd/stopped so there
is no chance that the event handler functions will be called so we can
be sure that there won't be a race on eh.msg_env in the callback functions. */
enif_free_env(priv->eh.msg_env);
struct wterl_event_handlers *eh = &priv->eh;
enif_mutex_destroy(eh->error_mutex);
enif_mutex_destroy(eh->message_mutex);
enif_mutex_destroy(eh->progress_mutex);
if (eh->msg_env_message)
enif_free_env(eh->msg_env_message);
if (eh->msg_env_error)
enif_free_env(eh->msg_env_error);
if (eh->msg_env_progress)
enif_free_env(eh->msg_env_progress);
kh_destroy(conns, h);
enif_mutex_unlock(priv->conns_mutex);
enif_mutex_destroy(priv->conns_mutex);
@ -2222,6 +2270,7 @@ static ErlNifFunc nif_funcs[] =
{"cursor_search_near_nif", 4, wterl_cursor_search_near},
{"cursor_search_nif", 4, wterl_cursor_search},
{"cursor_update_nif", 4, wterl_cursor_update},
{"set_event_handler_pid", 1, wterl_set_event_handler_pid},
};
ERL_NIF_INIT(wterl, nif_funcs, &on_load, &on_reload, &on_upgrade, &on_unload);

View file

@ -417,7 +417,7 @@ establish_connection(Config, Type) ->
wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)),
wterl:config_value(statistics_log, Config, [{wait, 30}]), % sec
wterl:config_value(verbose, Config, [
"ckpt" "block", "shared_cache", "evictserver", "fileops",
"ckpt", "block", "shared_cache", "evictserver", "fileops",
"hazard", "mutex", "read", "readserver", "reconcile",
"salvage", "verify", "write", "evict", "lsm"
]) ] ++ CheckpointSetting ++ proplists:get_value(wterl, Config, [])), % sec

View file

@ -67,6 +67,8 @@
fold_keys/3,
fold/3]).
-export([set_event_handler_pid/1]).
-include("async_nif.hrl").
-ifdef(TEST).
@ -94,11 +96,8 @@ nif_stub_error(Line) ->
-spec init() -> ok | {error, any()}.
init() ->
MsgPid = wterl_event_handler:start(),
erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]),
[{wterl_vsn, "a1459ce"},
{wiredtiger_vsn, "1.5.2-2-g8f2685b"},
{message_pid, MsgPid}]).
[{wterl_vsn, "a1459ce"}, {wiredtiger_vsn, "1.5.2-2-g8f2685b"}]).
-spec connection_open(string(), config_list()) -> {ok, connection()} | {error, term()}.
-spec connection_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}.
@ -510,6 +509,11 @@ config_to_bin([{Key, Value} | Rest], Acc) ->
end.
-spec set_event_handler_pid(pid()) -> ok.
set_event_handler_pid(Pid)
when is_pid(Pid) ->
?nif_stub.
%% ===================================================================
%% EUnit tests

View file

@ -84,14 +84,14 @@ init([]) ->
handle_call({open, Dir, ConnectionConfig, SessionConfig, Caller}, _From, #state{conn=undefined}=State) ->
{Reply, NState} =
case wterl:connection_open(Dir, ConnectionConfig, SessionConfig) of
{ok, ConnRef}=OK ->
Monitor = erlang:monitor(process, Caller),
true = ets:insert(wterl_ets, {Monitor, Caller}),
{OK, State#state{conn = ConnRef}};
Error ->
{Error, State}
end,
case wterl:connection_open(Dir, ConnectionConfig, SessionConfig) of
{ok, ConnRef}=OK ->
Monitor = erlang:monitor(process, Caller),
true = ets:insert(wterl_ets, {Monitor, Caller}),
{OK, State#state{conn = ConnRef}};
Error ->
{Error, State}
end,
{reply, Reply, NState};
handle_call({open, _Dir, _ConnectionConfig, _SessionConfig, Caller}, _From, #state{conn=ConnRef}=State) ->
Monitor = erlang:monitor(process, Caller),

View file

@ -46,4 +46,5 @@ start_link() ->
init([]) ->
{ok, {{one_for_one, 5, 10}, [?CHILD(wterl_ets, worker),
?CHILD(wterl_conn, worker)]}}.
?CHILD(wterl_conn, worker),
?CHILD(wterl_event_handler, worker)]}}.