Merge pull request #7 from basho-labs/gsb-event-handlers

Deliver WiredTiger event notifications (messages, errors, progress) into Erlang for logging
This commit is contained in:
Gregory Burd 2013-04-23 12:02:22 -07:00
commit 54c5158490
10 changed files with 447 additions and 59 deletions

View file

@ -5,12 +5,18 @@ REBAR= ./rebar
ERL= /usr/bin/env erl ERL= /usr/bin/env erl
ERLEXEC= ${ERL_ROOTDIR}/lib/erlang/erts-5.9.1/bin/erlexec ERLEXEC= ${ERL_ROOTDIR}/lib/erlang/erts-5.9.1/bin/erlexec
DIALYZER= /usr/bin/env dialyzer DIALYZER= /usr/bin/env dialyzer
ARCHIVETAG?= $(shell git describe --always --long --tags)
ARCHIVE?= $(shell basename $(CURDIR))-$(ARCHIVETAG)
WT_ARCHIVETAG?= $(shell cd c_src/wiredtiger-basho; git describe --always --long --tags)
.PHONY: plt analyze all deps compile get-deps clean .PHONY: plt analyze all deps compile get-deps clean
all: compile all: compile
archive:
@rm -f $(ARCHIVE).tar.gz
git archive --format=tar --prefix=$(ARCHIVE)/ $(ARCHIVETAG) | gzip >$(ARCHIVE).tar.gz
deps: get-deps deps: get-deps
get-deps: get-deps:
@ -33,34 +39,52 @@ compile: c_src/wterl.o ebin/app_helper.beam
@$(REBAR) compile @$(REBAR) compile
clean: clean:
@rm -f $(ARCHIVE).tar.gz
@$(REBAR) clean @$(REBAR) clean
xref:
@$(REBAR) xref skip_deps=true
test: eunit test: eunit
eunit: compile eunit: compile-for-eunit
@$(REBAR) eunit skip_deps=true @$(REBAR) eunit skip_deps=true
eunit_console: eqc: compile-for-eqc
@$(ERL) -pa .eunit deps/lager/ebin @$(REBAR) eqc skip_deps=true
proper: compile-for-proper
@echo "rebar does not implement a 'proper' command" && false
triq: compile-for-triq
@$(REBAR) triq skip_deps=true
compile-for-eunit:
@$(REBAR) compile eunit compile_only=true
compile-for-eqc:
@$(REBAR) -D QC -D QC_EQC compile eqc compile_only=true
compile-for-eqcmini:
@$(REBAR) -D QC -D QC_EQCMINI compile eqc compile_only=true
compile-for-proper:
@$(REBAR) -D QC -D QC_PROPER compile eqc compile_only=true
compile-for-triq:
@$(REBAR) -D QC -D QC_TRIQ compile triq compile_only=true
plt: compile plt: compile
@$(DIALYZER) --build_plt --output_plt .$(TARGET).plt -pa deps/lager/ebin --apps kernel stdlib @$(DIALYZER) --build_plt --output_plt .$(TARGET).plt -pa deps/lager/ebin --apps kernel stdlib
analyze: compile analyze: compile
$(DIALYZER) --plt .$(TARGET).plt -pa deps/lager/ebin ebin @$(DIALYZER) --plt .$(TARGET).plt -pa deps/lager/ebin ebin
repl: repl:
$(ERL) -pz deps/lager/ebin -pa ebin @$(ERL) -pa ebin -pz deps/lager/ebin
gdb-repl:
USE_GDB=1 $(ERL) -pz deps/lager/ebin -pa ebin
eunit-repl: eunit-repl:
$(ERL) -pz deps/lager/ebin -pa ebin -pa .eunit @$(ERL) -pa .eunit deps/lager/ebin
gdb-eunit-repl:
USE_GDB=1 $(ERL) -pa .eunit -pz deps/lager/ebin -pz ebin -exec 'cd(".eunit").'
ERL_TOP= /home/gburd/eng/otp_R15B01 ERL_TOP= /home/gburd/eng/otp_R15B01
CERL= ${ERL_TOP}/bin/cerl CERL= ${ERL_TOP}/bin/cerl

View file

@ -32,6 +32,10 @@ extern "C" {
#include "stats.h" // TODO: measure, measure... measure again #include "stats.h" // TODO: measure, measure... measure again
#endif #endif
#ifndef __UNUSED
#define __UNUSED(v) ((void)(v))
#endif
#define ASYNC_NIF_MAX_WORKERS 128 #define ASYNC_NIF_MAX_WORKERS 128
#define ASYNC_NIF_WORKER_QUEUE_SIZE 500 #define ASYNC_NIF_WORKER_QUEUE_SIZE 500
@ -61,8 +65,12 @@ struct async_nif_state {
#define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \ #define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \
struct decl ## _args frame; \ struct decl ## _args frame; \
static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) work_block \ static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) { \
__UNUSED(worker_id); \
do work_block while(0); \
} \
static void fn_post_ ## decl (struct decl ## _args *args) { \ static void fn_post_ ## decl (struct decl ## _args *args) { \
__UNUSED(args); \
do post_block while(0); \ do post_block while(0); \
} \ } \
static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \ static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \
@ -80,7 +88,7 @@ struct async_nif_state {
if (async_nif->shutdown) \ if (async_nif->shutdown) \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \ enif_make_atom(env, "shutdown")); \
if (!(new_env = enif_alloc_env())) { \ if (!(new_env = enif_alloc_env())) { /*TODO: cache, enif_clear();*/ \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \ enif_make_atom(env, "enomem")); \
} \ } \
@ -172,7 +180,7 @@ static ERL_NIF_TERM
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint) async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint)
{ {
/* Identify the most appropriate worker for this request. */ /* Identify the most appropriate worker for this request. */
unsigned int qid = (hint != -1) ? hint : async_nif->next_q; unsigned int qid = (hint >= 0) ? (unsigned int)hint : async_nif->next_q;
struct async_nif_work_queue *q = NULL; struct async_nif_work_queue *q = NULL;
do { do {
q = &async_nif->queues[qid]; q = &async_nif->queues[qid];
@ -193,6 +201,7 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
} else { } else {
break; break;
} }
// TODO: at some point add in work sheading/stealing
} while(1); } while(1);
/* We hold the queue's lock, and we've seletect a reasonable queue for this /* We hold the queue's lock, and we've seletect a reasonable queue for this
@ -265,6 +274,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
unsigned int i; unsigned int i;
unsigned int num_queues = async_nif->num_queues; unsigned int num_queues = async_nif->num_queues;
struct async_nif_work_queue *q = NULL; struct async_nif_work_queue *q = NULL;
__UNUSED(env);
/* Signal the worker threads, stop what you're doing and exit. To /* Signal the worker threads, stop what you're doing and exit. To
ensure that we don't race with the enqueue() process we first ensure that we don't race with the enqueue() process we first
@ -296,6 +306,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
/* Worker threads are stopped, now toss anything left in the queue. */ /* Worker threads are stopped, now toss anything left in the queue. */
struct async_nif_req_entry *req = NULL; struct async_nif_req_entry *req = NULL;
fifo_q_foreach(reqs, q->reqs, req, { fifo_q_foreach(reqs, q->reqs, req, {
enif_clear_env(req->env);
enif_send(NULL, &req->pid, req->env, enif_send(NULL, &req->pid, req->env,
enif_make_tuple2(req->env, enif_make_atom(req->env, "error"), enif_make_tuple2(req->env, enif_make_atom(req->env, "error"),
enif_make_atom(req->env, "shutdown"))); enif_make_atom(req->env, "shutdown")));
@ -400,6 +411,7 @@ async_nif_load()
static void static void
async_nif_upgrade(ErlNifEnv *env) async_nif_upgrade(ErlNifEnv *env)
{ {
__UNUSED(env);
// TODO: // TODO:
} }

View file

@ -28,7 +28,7 @@ export BASEDIR="$PWD"
which gmake 1>/dev/null 2>/dev/null && MAKE=gmake which gmake 1>/dev/null 2>/dev/null && MAKE=gmake
MAKE=${MAKE:-make} MAKE=${MAKE:-make}
export CFLAGS="$CFLAGS -g -I $BASEDIR/system/include" export CFLAGS="$CFLAGS -I $BASEDIR/system/include"
export CXXFLAGS="$CXXFLAGS -I $BASEDIR/system/include" export CXXFLAGS="$CXXFLAGS -I $BASEDIR/system/include"
export LDFLAGS="$LDFLAGS -L$BASEDIR/system/lib" export LDFLAGS="$LDFLAGS -L$BASEDIR/system/lib"
export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$BASEDIR/system/lib:$LD_LIBRARY_PATH" export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$BASEDIR/system/lib:$LD_LIBRARY_PATH"

View file

@ -35,6 +35,10 @@
# define dprint(s, ...) {} # define dprint(s, ...) {}
#endif #endif
#ifndef __UNUSED
#define __UNUSED(v) ((void)(v))
#endif
#include "wiredtiger.h" #include "wiredtiger.h"
#include "async_nif.h" #include "async_nif.h"
#include "khash.h" #include "khash.h"
@ -89,6 +93,22 @@ static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_NOT_FOUND; static ERL_NIF_TERM ATOM_NOT_FOUND;
static ERL_NIF_TERM ATOM_FIRST; static ERL_NIF_TERM ATOM_FIRST;
static ERL_NIF_TERM ATOM_LAST; static ERL_NIF_TERM ATOM_LAST;
static ERL_NIF_TERM ATOM_MESSAGE;
static ERL_NIF_TERM ATOM_PROGRESS;
static ERL_NIF_TERM ATOM_WTERL_VSN;
static ERL_NIF_TERM ATOM_WIREDTIGER_VSN;
static ERL_NIF_TERM ATOM_MSG_PID;
struct wterl_event_handlers {
WT_EVENT_HANDLER handlers;
ErlNifEnv *msg_env_error;
ErlNifMutex *error_mutex;
ErlNifEnv *msg_env_message;
ErlNifMutex *message_mutex;
ErlNifEnv *msg_env_progress;
ErlNifMutex *progress_mutex;
ErlNifPid to_pid;
};
/* Generators for 'conns' a named, type-specific hash table functions. */ /* Generators for 'conns' a named, type-specific hash table functions. */
KHASH_MAP_INIT_PTR(conns, WterlConnHandle*); KHASH_MAP_INIT_PTR(conns, WterlConnHandle*);
@ -97,12 +117,130 @@ struct wterl_priv_data {
void *async_nif_priv; // Note: must be first element in struct void *async_nif_priv; // Note: must be first element in struct
ErlNifMutex *conns_mutex; ErlNifMutex *conns_mutex;
khash_t(conns) *conns; khash_t(conns) *conns;
struct wterl_event_handlers eh;
char wterl_vsn[512];
char wiredtiger_vsn[512];
}; };
/* Global init for async_nif. */ /* Global init for async_nif. */
ASYNC_NIF_INIT(wterl); ASYNC_NIF_INIT(wterl);
/**
* Callback to handle error messages.
*
* Deliver error messages into Erlang to be logged via loger:error()
* or on failure, write message to stderr.
*
* error a WiredTiger, C99 or POSIX error code, which can
* be converted to a string using wiredtiger_strerror()
* message an error string
* -> 0 on success, a non-zero return may cause the WiredTiger
* function posting the event to fail, and may even cause
* operation or library failure.
*/
int
__wterl_error_handler(WT_EVENT_HANDLER *handler, int error, const char *message)
{
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
ErlNifEnv *msg_env;
ErlNifPid *to_pid;
int rc = 0;
enif_mutex_lock(eh->error_mutex);
msg_env = eh->msg_env_error;
to_pid = &eh->to_pid;
if (msg_env) {
ERL_NIF_TERM msg =
enif_make_tuple2(msg_env, ATOM_ERROR,
enif_make_tuple2(msg_env,
enif_make_atom(msg_env, erl_errno_id(error)),
enif_make_string(msg_env, message, ERL_NIF_LATIN1)));
enif_clear_env(msg_env);
if (!enif_send(NULL, to_pid, msg_env, msg))
fprintf(stderr, "[%d] %s\n", error, message);
} else {
rc = (fprintf(stderr, "[%d] %s\n", error, message) >= 0 ? 0 : EIO);
}
enif_mutex_unlock(eh->error_mutex);
return rc;
}
/**
* Callback to handle informational messages.
*
* Deliver informational messages into Erlang to be logged via loger:info()
* or on failure, write message to stdout.
*
* message an informational string
* -> 0 on success, a non-zero return may cause the WiredTiger
* function posting the event to fail, and may even cause
* operation or library failure.
*/
int
__wterl_message_handler(WT_EVENT_HANDLER *handler, const char *message)
{
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
ErlNifEnv *msg_env;
ErlNifPid *to_pid;
int rc = 0;
enif_mutex_lock(eh->message_mutex);
msg_env = eh->msg_env_message;
to_pid = &eh->to_pid;
if (msg_env) {
ERL_NIF_TERM msg =
enif_make_tuple2(msg_env, ATOM_MESSAGE,
enif_make_string(msg_env, message, ERL_NIF_LATIN1));
enif_clear_env(msg_env);
if (!enif_send(NULL, to_pid, msg_env, msg))
fprintf(stderr, "%s\n", message);
} else {
rc = (printf("%s\n", message) >= 0 ? 0 : EIO);
}
enif_mutex_unlock(eh->message_mutex);
return rc;
}
/**
* Callback to handle progress messages.
*
* Deliver progress messages into Erlang to be logged via loger:info()
* or on failure, written message to stdout.
*
* operation a string representation of the operation
* counter a progress counter [0..100]
* -> 0 on success, a non-zero return may cause the WiredTiger
* function posting the event to fail, and may even cause
* operation or library failure.
*/
int
__wterl_progress_handler(WT_EVENT_HANDLER *handler, const char *operation, uint64_t counter)
{
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
ErlNifEnv *msg_env;
ErlNifPid *to_pid;
int rc = 0;
enif_mutex_lock(eh->progress_mutex);
msg_env = eh->msg_env_progress;
to_pid = &eh->to_pid;
if (msg_env) {
ERL_NIF_TERM msg =
enif_make_tuple2(msg_env, ATOM_PROGRESS,
enif_make_tuple2(msg_env,
enif_make_string(msg_env, operation, ERL_NIF_LATIN1),
enif_make_int64(msg_env, counter)));
enif_clear_env(msg_env);
if (!enif_send(NULL, to_pid, msg_env, msg))
fprintf(stderr, "[%ld] %s\n", counter, operation);
} else {
rc = (printf("[%ld] %s\n", counter, operation) >= 0 ? 0 : EIO);
}
enif_mutex_unlock(eh->progress_mutex);
return rc;
}
/** /**
* Open a WT_SESSION for the thread context 'ctx' to use, also init the * Open a WT_SESSION for the thread context 'ctx' to use, also init the
* shared cursor hash table. * shared cursor hash table.
@ -119,7 +257,13 @@ __init_session_and_cursor_cache(WterlConnHandle *conn_handle, WterlCtx *ctx)
ctx->session = NULL; ctx->session = NULL;
return rc; return rc;
} }
ctx->cursors = kh_init(cursors); ctx->cursors = kh_init(cursors);
if (!ctx->cursors) {
ctx->session->close(ctx->session, NULL);
ctx->session = NULL;
return ENOMEM;
}
return 0; return 0;
} }
@ -256,6 +400,9 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char
static void static void
__release_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char *uri, WT_CURSOR *cursor) __release_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char *uri, WT_CURSOR *cursor)
{ {
__UNUSED(conn_handle);
__UNUSED(worker_id);
__UNUSED(uri);
cursor->reset(cursor); cursor->reset(cursor);
} }
@ -327,7 +474,11 @@ ASYNC_NIF_DECL(
ASYNC_NIF_REPLY(enif_make_badarg(env)); ASYNC_NIF_REPLY(enif_make_badarg(env));
return; return;
} }
int rc = wiredtiger_open(args->homedir, NULL, config.data[0] != 0 ? (const char*)config.data : NULL, &conn);
int rc = wiredtiger_open(args->homedir,
(WT_EVENT_HANDLER*)&args->priv->eh.handlers,
config.data[0] != 0 ? (const char*)config.data : NULL,
&conn);
if (rc == 0) { if (rc == 0) {
WterlConnHandle *conn_handle = enif_alloc_resource(wterl_conn_RESOURCE, sizeof(WterlConnHandle)); WterlConnHandle *conn_handle = enif_alloc_resource(wterl_conn_RESOURCE, sizeof(WterlConnHandle));
if (!conn_handle) { if (!conn_handle) {
@ -847,14 +998,6 @@ ASYNC_NIF_DECL(
item_start.data = start_key.data; item_start.data = start_key.data;
item_start.size = start_key.size; item_start.size = start_key.size;
start->set_key(start, item_start); start->set_key(start, item_start);
rc = start->search(start);
if (rc != 0) {
start->close(start);
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
} }
if (!args->to_last) { if (!args->to_last) {
@ -890,15 +1033,6 @@ ASYNC_NIF_DECL(
item_stop.data = stop_key.data; item_stop.data = stop_key.data;
item_stop.size = stop_key.size; item_stop.size = stop_key.size;
stop->set_key(stop, item_stop); stop->set_key(stop, item_stop);
rc = stop->search(stop);
if (rc != 0) {
start->close(start);
stop->close(stop);
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
} }
/* Always pass NULL for URI here because we always specify the range with the /* Always pass NULL for URI here because we always specify the range with the
@ -1889,6 +2023,37 @@ ASYNC_NIF_DECL(
enif_release_resource((void*)args->cursor_handle); enif_release_resource((void*)args->cursor_handle);
}); });
/**
* Called by wterl_event_handler to set the pid for message delivery.
*/
static ERL_NIF_TERM
wterl_set_event_handler_pid(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
struct wterl_priv_data *priv = enif_priv_data(env);
struct wterl_event_handlers *eh = &priv->eh;
if (!(argc == 1 && enif_is_pid(env, argv[0]))) {
return enif_make_badarg(env);
}
if (enif_get_local_pid(env, argv[0], &eh->to_pid)) {
if (!eh->msg_env_message)
eh->msg_env_message = enif_alloc_env(); // TOOD: if (!eh->msg_env) { return ENOMEM; }
if (!eh->msg_env_error)
eh->msg_env_error = enif_alloc_env();
if (!eh->msg_env_progress)
eh->msg_env_progress = enif_alloc_env();
eh->handlers.handle_error = __wterl_error_handler;
eh->handlers.handle_message = __wterl_message_handler;
eh->handlers.handle_progress = __wterl_progress_handler;
} else {
memset(&eh->to_pid, 0, sizeof(ErlNifPid));
}
return ATOM_OK;
}
/** /**
* Called as this driver is loaded by the Erlang BEAM runtime triggered by the * Called as this driver is loaded by the Erlang BEAM runtime triggered by the
* module's on_load directive. * module's on_load directive.
@ -1903,6 +2068,9 @@ ASYNC_NIF_DECL(
static int static int
on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
{ {
int arity;
ERL_NIF_TERM head, tail;
const ERL_NIF_TERM* option;
ErlNifResourceFlags flags = ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER; ErlNifResourceFlags flags = ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER;
wterl_conn_RESOURCE = enif_open_resource_type(env, NULL, "wterl_conn_resource", wterl_conn_RESOURCE = enif_open_resource_type(env, NULL, "wterl_conn_resource",
NULL, flags, NULL); NULL, flags, NULL);
@ -1914,25 +2082,69 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
ATOM_NOT_FOUND = enif_make_atom(env, "not_found"); ATOM_NOT_FOUND = enif_make_atom(env, "not_found");
ATOM_FIRST = enif_make_atom(env, "first"); ATOM_FIRST = enif_make_atom(env, "first");
ATOM_LAST = enif_make_atom(env, "last"); ATOM_LAST = enif_make_atom(env, "last");
ATOM_MESSAGE = enif_make_atom(env, "message");
ATOM_PROGRESS = enif_make_atom(env, "progress");
ATOM_WTERL_VSN = enif_make_atom(env, "wterl_vsn");
ATOM_WIREDTIGER_VSN = enif_make_atom(env, "wiredtiger_vsn");
ATOM_MSG_PID = enif_make_atom(env, "message_pid");
struct wterl_priv_data *priv = enif_alloc(sizeof(struct wterl_priv_data)); struct wterl_priv_data *priv = enif_alloc(sizeof(struct wterl_priv_data));
if (!priv) if (!priv)
return ENOMEM; return ENOMEM;
memset(priv, 0, sizeof(struct wterl_priv_data)); memset(priv, 0, sizeof(struct wterl_priv_data));
priv->conns_mutex = enif_mutex_create(NULL);
priv->conns = kh_init(conns);
if (!priv->conns) {
enif_mutex_destroy(priv->conns_mutex);
enif_free(priv);
return ENOMEM;
}
struct wterl_event_handlers *eh = &priv->eh;
eh->error_mutex = enif_mutex_create(NULL);
eh->message_mutex = enif_mutex_create(NULL);
eh->progress_mutex = enif_mutex_create(NULL);
/* Process the load_info array of tuples, we expect:
[{wterl_vsn, "a version string"},
{wiredtiger_vsn, "a version string"}]. */
while (enif_get_list_cell(env, load_info, &head, &tail)) {
if (enif_get_tuple(env, head, &arity, &option)) {
if (arity == 2) {
if (enif_is_identical(option[0], ATOM_WTERL_VSN)) {
enif_get_string(env, option[1], priv->wterl_vsn, sizeof(priv->wterl_vsn), ERL_NIF_LATIN1);
} else if (enif_is_identical(option[0], ATOM_WIREDTIGER_VSN)) {
enif_get_string(env, option[1], priv->wiredtiger_vsn, sizeof(priv->wiredtiger_vsn), ERL_NIF_LATIN1);
}
}
}
load_info = tail;
}
/* Note: !!! the first element of our priv_data struct *must* be the /* Note: !!! the first element of our priv_data struct *must* be the
pointer to the async_nif's private data which we set here. */ pointer to the async_nif's private data which we set here. */
ASYNC_NIF_LOAD(wterl, priv->async_nif_priv); ASYNC_NIF_LOAD(wterl, priv->async_nif_priv);
if (!priv->async_nif_priv) {
priv->conns_mutex = enif_mutex_create(NULL); kh_destroy(conns, priv->conns);
priv->conns = kh_init(conns); enif_mutex_destroy(priv->conns_mutex);
enif_free(priv);
return ENOMEM;
}
*priv_data = priv; *priv_data = priv;
return *priv_data ? 0 : ENOMEM;
char msg[1024];
snprintf(msg, 1024, "NIF on_load complete (wterl version: %s, wiredtiger version: %s)", priv->wterl_vsn, priv->wiredtiger_vsn);
__wterl_message_handler((WT_EVENT_HANDLER *)&priv->eh, msg);
return 0;
} }
static int static int
on_reload(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) on_reload(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
{ {
__UNUSED(env);
__UNUSED(priv_data);
__UNUSED(load_info);
return 0; // TODO: implement return 0; // TODO: implement
} }
@ -1996,6 +2208,20 @@ on_unload(ErlNifEnv *env, void *priv_data)
} }
} }
/* At this point all WiredTiger state and threads are free'd/stopped so there
is no chance that the event handler functions will be called so we can
be sure that there won't be a race on eh.msg_env in the callback functions. */
struct wterl_event_handlers *eh = &priv->eh;
enif_mutex_destroy(eh->error_mutex);
enif_mutex_destroy(eh->message_mutex);
enif_mutex_destroy(eh->progress_mutex);
if (eh->msg_env_message)
enif_free_env(eh->msg_env_message);
if (eh->msg_env_error)
enif_free_env(eh->msg_env_error);
if (eh->msg_env_progress)
enif_free_env(eh->msg_env_progress);
kh_destroy(conns, h); kh_destroy(conns, h);
enif_mutex_unlock(priv->conns_mutex); enif_mutex_unlock(priv->conns_mutex);
enif_mutex_destroy(priv->conns_mutex); enif_mutex_destroy(priv->conns_mutex);
@ -2005,6 +2231,9 @@ on_unload(ErlNifEnv *env, void *priv_data)
static int static int
on_upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data, ERL_NIF_TERM load_info) on_upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data, ERL_NIF_TERM load_info)
{ {
__UNUSED(priv_data);
__UNUSED(old_priv_data);
__UNUSED(load_info);
ASYNC_NIF_UPGRADE(wterl, env); // TODO: implement ASYNC_NIF_UPGRADE(wterl, env); // TODO: implement
return 0; return 0;
} }
@ -2041,6 +2270,7 @@ static ErlNifFunc nif_funcs[] =
{"cursor_search_near_nif", 4, wterl_cursor_search_near}, {"cursor_search_near_nif", 4, wterl_cursor_search_near},
{"cursor_search_nif", 4, wterl_cursor_search}, {"cursor_search_nif", 4, wterl_cursor_search},
{"cursor_update_nif", 4, wterl_cursor_update}, {"cursor_update_nif", 4, wterl_cursor_update},
{"set_event_handler_pid", 1, wterl_set_event_handler_pid},
}; };
ERL_NIF_INIT(wterl, nif_funcs, &on_load, &on_reload, &on_upgrade, &on_unload); ERL_NIF_INIT(wterl, nif_funcs, &on_load, &on_reload, &on_upgrade, &on_unload);

View file

@ -1,7 +1,7 @@
%%-*- mode: erlang -*- %%-*- mode: erlang -*-
%% ex: ft=erlang ts=4 sw=4 et %% ex: ft=erlang ts=4 sw=4 et
{require_otp_vsn, "R1[456]"}. {require_otp_vsn, "R1[567]"}.
{cover_enabled, true}. {cover_enabled, true}.
@ -27,7 +27,7 @@
%strict_validation %strict_validation
]}. ]}.
{xref_checks, [undefined_function_calls]}. {xref_checks, [undefined_function_calls, deprecated_function_calls]}.
{deps, [ {deps, [
{lager, "2.*", {git, "git://github.com/basho/lager", {branch, "master"}}} {lager, "2.*", {git, "git://github.com/basho/lager", {branch, "master"}}}
@ -36,7 +36,7 @@
{port_specs, [{"priv/wterl.so", ["c_src/*.c"]}]}. {port_specs, [{"priv/wterl.so", ["c_src/*.c"]}]}.
{port_env, [ {port_env, [
{"DRV_CFLAGS", "$DRV_CFLAGS -Werror -I c_src/system/include"}, {"DRV_CFLAGS", "$DRV_CFLAGS -fPIC -Wall -Wextra -Werror -I c_src/system/include"},
{"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:priv -Lc_src/system/lib -lwiredtiger"} {"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:priv -Lc_src/system/lib -lwiredtiger"}
]}. ]}.

View file

@ -416,10 +416,14 @@ establish_connection(Config, Type) ->
wterl:config_value(session_max, Config, max_sessions(Config)), wterl:config_value(session_max, Config, max_sessions(Config)),
wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)), wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)),
wterl:config_value(statistics_log, Config, [{wait, 30}]), % sec wterl:config_value(statistics_log, Config, [{wait, 30}]), % sec
wterl:config_value(verbose, Config, [ wterl:config_value(verbose, Config, [ "salvage", "verify"
%"ckpt" "block", "shared_cache", "evictserver", "fileops", % Note: for some unknown reason, if you add these additional
%"hazard", "mutex", "read", "readserver", "reconcile", % verbose flags Erlang SEGV's "size_object: bad tag for 0x80"
%"salvage", "verify", "write", "evict", "lsm" % no idea why... yet... you've been warned.
%"block", "shared_cache", "reconcile", "evict", "lsm",
%"fileops", "read", "write", "readserver", "evictserver",
%"hazard", "mutex", "ckpt"
]) ] ++ CheckpointSetting ++ proplists:get_value(wterl, Config, [])), % sec ]) ] ++ CheckpointSetting ++ proplists:get_value(wterl, Config, [])), % sec

View file

@ -67,6 +67,8 @@
fold_keys/3, fold_keys/3,
fold/3]). fold/3]).
-export([set_event_handler_pid/1]).
-include("async_nif.hrl"). -include("async_nif.hrl").
-ifdef(TEST). -ifdef(TEST).
@ -95,8 +97,7 @@ nif_stub_error(Line) ->
-spec init() -> ok | {error, any()}. -spec init() -> ok | {error, any()}.
init() -> init() ->
erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]), erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]),
[{wterl, "07061ed6e8252543c2f06b81a646eca6945cc558"}, [{wterl_vsn, "a1459ce"}, {wiredtiger_vsn, "1.5.2-2-g8f2685b"}]).
{wiredtiger, "6f7a4b961c744bfb21f0c21d4c28c2d162400f1b"}]).
-spec connection_open(string(), config_list()) -> {ok, connection()} | {error, term()}. -spec connection_open(string(), config_list()) -> {ok, connection()} | {error, term()}.
-spec connection_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}. -spec connection_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}.
@ -508,6 +509,11 @@ config_to_bin([{Key, Value} | Rest], Acc) ->
end. end.
-spec set_event_handler_pid(pid()) -> ok.
set_event_handler_pid(Pid)
when is_pid(Pid) ->
?nif_stub.
%% =================================================================== %% ===================================================================
%% EUnit tests %% EUnit tests
@ -862,7 +868,7 @@ prop_put_delete() ->
DataDir = "test/wterl.putdelete.qc", DataDir = "test/wterl.putdelete.qc",
Table = "table:eqc", Table = "table:eqc",
{ok, CWD} = file:get_cwd(), {ok, CWD} = file:get_cwd(),
?cmd("rm -rf "++DataDir), rmdir(filename:join([CWD, DataDir])), % ?cmd("rm -rf " ++ filename:join([CWD, DataDir])),
ok = filelib:ensure_dir(filename:join([DataDir, "x"])), ok = filelib:ensure_dir(filename:join([DataDir, "x"])),
{ok, ConnRef} = wterl:connection_open(DataDir, [{create,true}]), {ok, ConnRef} = wterl:connection_open(DataDir, [{create,true}]),
try try

View file

@ -84,14 +84,14 @@ init([]) ->
handle_call({open, Dir, ConnectionConfig, SessionConfig, Caller}, _From, #state{conn=undefined}=State) -> handle_call({open, Dir, ConnectionConfig, SessionConfig, Caller}, _From, #state{conn=undefined}=State) ->
{Reply, NState} = {Reply, NState} =
case wterl:connection_open(Dir, ConnectionConfig, SessionConfig) of case wterl:connection_open(Dir, ConnectionConfig, SessionConfig) of
{ok, ConnRef}=OK -> {ok, ConnRef}=OK ->
Monitor = erlang:monitor(process, Caller), Monitor = erlang:monitor(process, Caller),
true = ets:insert(wterl_ets, {Monitor, Caller}), true = ets:insert(wterl_ets, {Monitor, Caller}),
{OK, State#state{conn = ConnRef}}; {OK, State#state{conn = ConnRef}};
Error -> Error ->
{Error, State} {Error, State}
end, end,
{reply, Reply, NState}; {reply, Reply, NState};
handle_call({open, _Dir, _ConnectionConfig, _SessionConfig, Caller}, _From, #state{conn=ConnRef}=State) -> handle_call({open, _Dir, _ConnectionConfig, _SessionConfig, Caller}, _From, #state{conn=ConnRef}=State) ->
Monitor = erlang:monitor(process, Caller), Monitor = erlang:monitor(process, Caller),

111
src/wterl_event_handler.erl Normal file
View file

@ -0,0 +1,111 @@
%% -------------------------------------------------------------------
%%
%% wterl: Erlang Wrapper for WiredTiger
%%
%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(wterl_event_handler).
-behaviour(gen_server).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
%% API
-export([start_link/0, stop/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(PREFIX, "wiredtiger").
%% ====================================================================
%% API
%% ====================================================================
-spec start_link() -> {ok, pid()} | {error, term()}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec stop() -> ok.
stop() ->
gen_server:cast(?MODULE, stop).
%% ====================================================================
%% gen_server callbacks
%% ====================================================================
init([]) ->
wterl:set_event_handler_pid(self()),
{ok, []}.
handle_call(_Msg, _From, State) ->
{reply, ok, State}.
handle_cast(stop, State) ->
{stop, normal, State};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({error, {Errno, Message}}, State) ->
log(error, "~s: (~s) ~s", [?PREFIX, Errno, Message]),
{noreply, State};
handle_info({message, Info}, State) ->
log(info, "~s: ~s", [?PREFIX, Info]),
{noreply, State};
handle_info({progress, {Operation, Counter}}, State) ->
log(info, "~s: progress on ~s [~b]", [?PREFIX, Operation, Counter]),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ====================================================================
%% Internal functions
%% ====================================================================
%% @private
-spec log(error | info, string(), [any()]) -> ok.
log(Urgency, Format, Args) ->
case proplists:is_defined(lager, application:which_applications()) of
true ->
log(lager, Urgency, Format, Args);
false ->
log(stdio, Urgency, Format, Args)
end.
-spec log(lager | stdio, error | info, string(), [any()]) -> ok.
log(lager, error, Format, Args) ->
lager:error(Format, Args);
log(lager, info, Format, Args) ->
lager:info(Format, Args);
log(stdio, _, Format, Args) ->
io:format(Format ++ "~n", Args).
%% ===================================================================
%% EUnit tests
%% ===================================================================
-ifdef(TEST).
-endif.

View file

@ -46,4 +46,5 @@ start_link() ->
init([]) -> init([]) ->
{ok, {{one_for_one, 5, 10}, [?CHILD(wterl_ets, worker), {ok, {{one_for_one, 5, 10}, [?CHILD(wterl_ets, worker),
?CHILD(wterl_conn, worker)]}}. ?CHILD(wterl_conn, worker),
?CHILD(wterl_event_handler, worker)]}}.