diff --git a/c_src/wterl.c b/c_src/wterl.c index 1e71186..2890f7c 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -100,9 +100,14 @@ static ERL_NIF_TERM ATOM_WIREDTIGER_VSN; static ERL_NIF_TERM ATOM_MSG_PID; struct wterl_event_handlers { - WT_EVENT_HANDLER handlers; - ErlNifEnv *msg_env; - ErlNifPid to_pid; + WT_EVENT_HANDLER handlers; + ErlNifEnv *msg_env_error; + ErlNifMutex *error_mutex; + ErlNifEnv *msg_env_message; + ErlNifMutex *message_mutex; + ErlNifEnv *msg_env_progress; + ErlNifMutex *progress_mutex; + ErlNifPid to_pid; }; /* Generators for 'conns' a named, type-specific hash table functions. */ @@ -138,9 +143,14 @@ int __wterl_error_handler(WT_EVENT_HANDLER *handler, int error, const char *message) { struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler; - ErlNifEnv *msg_env = eh->msg_env; - ErlNifPid *to_pid = &eh->to_pid; - if (msg_env && to_pid) { + ErlNifEnv *msg_env; + ErlNifPid *to_pid; + int rc = 0; + + enif_mutex_lock(eh->error_mutex); + msg_env = eh->msg_env_error; + to_pid = &eh->to_pid; + if (msg_env) { ERL_NIF_TERM msg = enif_make_tuple2(msg_env, ATOM_ERROR, enif_make_tuple2(msg_env, @@ -149,11 +159,11 @@ __wterl_error_handler(WT_EVENT_HANDLER *handler, int error, const char *message) enif_clear_env(msg_env); if (!enif_send(NULL, to_pid, msg_env, msg)) fprintf(stderr, "[%d] %s\n", error, message); - return 0; } else { - // output to stderr - return (fprintf(stderr, "[%d] %s\n", error, message) >= 0 ? 0 : EIO); + rc = (fprintf(stderr, "[%d] %s\n", error, message) >= 0 ? 0 : EIO); } + enif_mutex_unlock(eh->error_mutex); + return rc; } /** @@ -171,20 +181,25 @@ int __wterl_message_handler(WT_EVENT_HANDLER *handler, const char *message) { struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler; - ErlNifEnv *msg_env = eh->msg_env; - ErlNifPid *to_pid = &eh->to_pid; - if (msg_env && to_pid) { + ErlNifEnv *msg_env; + ErlNifPid *to_pid; + int rc = 0; + + enif_mutex_lock(eh->message_mutex); + msg_env = eh->msg_env_message; + to_pid = &eh->to_pid; + if (msg_env) { ERL_NIF_TERM msg = enif_make_tuple2(msg_env, ATOM_MESSAGE, enif_make_string(msg_env, message, ERL_NIF_LATIN1)); enif_clear_env(msg_env); if (!enif_send(NULL, to_pid, msg_env, msg)) fprintf(stderr, "%s\n", message); - return 0; } else { - // output to stdout - return (printf("%s\n", message) >= 0 ? 0 : EIO); + rc = (printf("%s\n", message) >= 0 ? 0 : EIO); } + enif_mutex_unlock(eh->message_mutex); + return rc; } /** @@ -203,9 +218,14 @@ int __wterl_progress_handler(WT_EVENT_HANDLER *handler, const char *operation, uint64_t counter) { struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler; - ErlNifEnv *msg_env = eh->msg_env; - ErlNifPid *to_pid = &eh->to_pid; - if (msg_env && to_pid) { + ErlNifEnv *msg_env; + ErlNifPid *to_pid; + int rc = 0; + + enif_mutex_lock(eh->progress_mutex); + msg_env = eh->msg_env_progress; + to_pid = &eh->to_pid; + if (msg_env) { ERL_NIF_TERM msg = enif_make_tuple2(msg_env, ATOM_PROGRESS, enif_make_tuple2(msg_env, @@ -214,11 +234,11 @@ __wterl_progress_handler(WT_EVENT_HANDLER *handler, const char *operation, uint6 enif_clear_env(msg_env); if (!enif_send(NULL, to_pid, msg_env, msg)) fprintf(stderr, "[%ld] %s\n", counter, operation); - return 0; } else { - // output to stdout - return (printf("[%ld] %s\n", counter, operation) >= 0 ? 0 : EIO); + rc = (printf("[%ld] %s\n", counter, operation) >= 0 ? 0 : EIO); } + enif_mutex_unlock(eh->progress_mutex); + return rc; } /** @@ -2003,6 +2023,37 @@ ASYNC_NIF_DECL( enif_release_resource((void*)args->cursor_handle); }); + +/** + * Called by wterl_event_handler to set the pid for message delivery. + */ +static ERL_NIF_TERM +wterl_set_event_handler_pid(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + struct wterl_priv_data *priv = enif_priv_data(env); + struct wterl_event_handlers *eh = &priv->eh; + + if (!(argc == 1 && enif_is_pid(env, argv[0]))) { + return enif_make_badarg(env); + } + if (enif_get_local_pid(env, argv[0], &eh->to_pid)) { + if (!eh->msg_env_message) + eh->msg_env_message = enif_alloc_env(); // TOOD: if (!eh->msg_env) { return ENOMEM; } + if (!eh->msg_env_error) + eh->msg_env_error = enif_alloc_env(); + if (!eh->msg_env_progress) + eh->msg_env_progress = enif_alloc_env(); + + eh->handlers.handle_error = __wterl_error_handler; + eh->handlers.handle_message = __wterl_message_handler; + eh->handlers.handle_progress = __wterl_progress_handler; + } else { + memset(&eh->to_pid, 0, sizeof(ErlNifPid)); + } + return ATOM_OK; +} + + /** * Called as this driver is loaded by the Erlang BEAM runtime triggered by the * module's on_load directive. @@ -2051,22 +2102,13 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) } struct wterl_event_handlers *eh = &priv->eh; - eh->msg_env = enif_alloc_env(); - if (!eh->msg_env) { - kh_destroy(conns, priv->conns); - enif_mutex_destroy(priv->conns_mutex); - enif_free(priv); - return ENOMEM; - } - eh->handlers.handle_error = __wterl_error_handler; - eh->handlers.handle_message = __wterl_message_handler; - eh->handlers.handle_progress = __wterl_progress_handler; + eh->error_mutex = enif_mutex_create(NULL); + eh->message_mutex = enif_mutex_create(NULL); + eh->progress_mutex = enif_mutex_create(NULL); /* Process the load_info array of tuples, we expect: [{wterl_vsn, "a version string"}, - {wiredtiger_vsn, "a version string"}, - {message_pid, ERL_NIF_TERM pid}]. - */ + {wiredtiger_vsn, "a version string"}]. */ while (enif_get_list_cell(env, load_info, &head, &tail)) { if (enif_get_tuple(env, head, &arity, &option)) { if (arity == 2) { @@ -2074,9 +2116,6 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) enif_get_string(env, option[1], priv->wterl_vsn, sizeof(priv->wterl_vsn), ERL_NIF_LATIN1); } else if (enif_is_identical(option[0], ATOM_WIREDTIGER_VSN)) { enif_get_string(env, option[1], priv->wiredtiger_vsn, sizeof(priv->wiredtiger_vsn), ERL_NIF_LATIN1); - } else if (enif_is_identical(option[0], ATOM_MSG_PID)) { - if (!enif_get_local_pid(env, option[1], &priv->eh.to_pid)) - return -1; } } } @@ -2087,7 +2126,6 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) pointer to the async_nif's private data which we set here. */ ASYNC_NIF_LOAD(wterl, priv->async_nif_priv); if (!priv->async_nif_priv) { - enif_free_env(eh->msg_env); kh_destroy(conns, priv->conns); enif_mutex_destroy(priv->conns_mutex); enif_free(priv); @@ -2173,7 +2211,17 @@ on_unload(ErlNifEnv *env, void *priv_data) /* At this point all WiredTiger state and threads are free'd/stopped so there is no chance that the event handler functions will be called so we can be sure that there won't be a race on eh.msg_env in the callback functions. */ - enif_free_env(priv->eh.msg_env); + struct wterl_event_handlers *eh = &priv->eh; + enif_mutex_destroy(eh->error_mutex); + enif_mutex_destroy(eh->message_mutex); + enif_mutex_destroy(eh->progress_mutex); + if (eh->msg_env_message) + enif_free_env(eh->msg_env_message); + if (eh->msg_env_error) + enif_free_env(eh->msg_env_error); + if (eh->msg_env_progress) + enif_free_env(eh->msg_env_progress); + kh_destroy(conns, h); enif_mutex_unlock(priv->conns_mutex); enif_mutex_destroy(priv->conns_mutex); @@ -2222,6 +2270,7 @@ static ErlNifFunc nif_funcs[] = {"cursor_search_near_nif", 4, wterl_cursor_search_near}, {"cursor_search_nif", 4, wterl_cursor_search}, {"cursor_update_nif", 4, wterl_cursor_update}, + {"set_event_handler_pid", 1, wterl_set_event_handler_pid}, }; ERL_NIF_INIT(wterl, nif_funcs, &on_load, &on_reload, &on_upgrade, &on_unload); diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index adbb437..3780f91 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -417,7 +417,7 @@ establish_connection(Config, Type) -> wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)), wterl:config_value(statistics_log, Config, [{wait, 30}]), % sec wterl:config_value(verbose, Config, [ - "ckpt" "block", "shared_cache", "evictserver", "fileops", + "ckpt", "block", "shared_cache", "evictserver", "fileops", "hazard", "mutex", "read", "readserver", "reconcile", "salvage", "verify", "write", "evict", "lsm" ]) ] ++ CheckpointSetting ++ proplists:get_value(wterl, Config, [])), % sec diff --git a/src/wterl.erl b/src/wterl.erl index 03fc623..1b3c3d7 100644 --- a/src/wterl.erl +++ b/src/wterl.erl @@ -67,6 +67,8 @@ fold_keys/3, fold/3]). +-export([set_event_handler_pid/1]). + -include("async_nif.hrl"). -ifdef(TEST). @@ -94,11 +96,8 @@ nif_stub_error(Line) -> -spec init() -> ok | {error, any()}. init() -> - MsgPid = wterl_event_handler:start(), erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]), - [{wterl_vsn, "a1459ce"}, - {wiredtiger_vsn, "1.5.2-2-g8f2685b"}, - {message_pid, MsgPid}]). + [{wterl_vsn, "a1459ce"}, {wiredtiger_vsn, "1.5.2-2-g8f2685b"}]). -spec connection_open(string(), config_list()) -> {ok, connection()} | {error, term()}. -spec connection_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}. @@ -510,6 +509,11 @@ config_to_bin([{Key, Value} | Rest], Acc) -> end. +-spec set_event_handler_pid(pid()) -> ok. +set_event_handler_pid(Pid) + when is_pid(Pid) -> + ?nif_stub. + %% =================================================================== %% EUnit tests diff --git a/src/wterl_conn.erl b/src/wterl_conn.erl index b7ead8b..9f62d46 100644 --- a/src/wterl_conn.erl +++ b/src/wterl_conn.erl @@ -84,14 +84,14 @@ init([]) -> handle_call({open, Dir, ConnectionConfig, SessionConfig, Caller}, _From, #state{conn=undefined}=State) -> {Reply, NState} = - case wterl:connection_open(Dir, ConnectionConfig, SessionConfig) of - {ok, ConnRef}=OK -> - Monitor = erlang:monitor(process, Caller), - true = ets:insert(wterl_ets, {Monitor, Caller}), - {OK, State#state{conn = ConnRef}}; - Error -> - {Error, State} - end, + case wterl:connection_open(Dir, ConnectionConfig, SessionConfig) of + {ok, ConnRef}=OK -> + Monitor = erlang:monitor(process, Caller), + true = ets:insert(wterl_ets, {Monitor, Caller}), + {OK, State#state{conn = ConnRef}}; + Error -> + {Error, State} + end, {reply, Reply, NState}; handle_call({open, _Dir, _ConnectionConfig, _SessionConfig, Caller}, _From, #state{conn=ConnRef}=State) -> Monitor = erlang:monitor(process, Caller), diff --git a/src/wterl_sup.erl b/src/wterl_sup.erl index 07bc6cd..8c544ae 100644 --- a/src/wterl_sup.erl +++ b/src/wterl_sup.erl @@ -46,4 +46,5 @@ start_link() -> init([]) -> {ok, {{one_for_one, 5, 10}, [?CHILD(wterl_ets, worker), - ?CHILD(wterl_conn, worker)]}}. + ?CHILD(wterl_conn, worker), + ?CHILD(wterl_event_handler, worker)]}}.