WIP: a good start, I need to switch over wterl_event_handler to be a

gen_server and I need to add a way to set the pid of the message handler
process to the NIF API.
This commit is contained in:
Gregory Burd 2013-04-22 09:52:21 -04:00
parent 3398540831
commit 652771003e
4 changed files with 220 additions and 15 deletions

View file

@ -32,6 +32,10 @@ extern "C" {
#include "stats.h" // TODO: measure, measure... measure again #include "stats.h" // TODO: measure, measure... measure again
#endif #endif
#ifndef __UNUSED
#define __UNUSED(v) ((void)(v))
#endif
#define ASYNC_NIF_MAX_WORKERS 128 #define ASYNC_NIF_MAX_WORKERS 128
#define ASYNC_NIF_WORKER_QUEUE_SIZE 500 #define ASYNC_NIF_WORKER_QUEUE_SIZE 500
@ -61,8 +65,12 @@ struct async_nif_state {
#define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \ #define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \
struct decl ## _args frame; \ struct decl ## _args frame; \
static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) work_block \ static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) { \
__UNUSED(worker_id); \
do work_block while(0); \
} \
static void fn_post_ ## decl (struct decl ## _args *args) { \ static void fn_post_ ## decl (struct decl ## _args *args) { \
__UNUSED(args); \
do post_block while(0); \ do post_block while(0); \
} \ } \
static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \ static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \
@ -80,7 +88,7 @@ struct async_nif_state {
if (async_nif->shutdown) \ if (async_nif->shutdown) \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \ enif_make_atom(env, "shutdown")); \
if (!(new_env = enif_alloc_env())) { \ if (!(new_env = enif_alloc_env())) { /*TODO: cache, enif_clear();*/ \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \ enif_make_atom(env, "enomem")); \
} \ } \
@ -172,7 +180,7 @@ static ERL_NIF_TERM
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint) async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint)
{ {
/* Identify the most appropriate worker for this request. */ /* Identify the most appropriate worker for this request. */
unsigned int qid = (hint != -1) ? hint : async_nif->next_q; unsigned int qid = (hint >= 0) ? (unsigned int)hint : async_nif->next_q;
struct async_nif_work_queue *q = NULL; struct async_nif_work_queue *q = NULL;
do { do {
q = &async_nif->queues[qid]; q = &async_nif->queues[qid];
@ -193,6 +201,7 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
} else { } else {
break; break;
} }
// TODO: at some point add in work sheading/stealing
} while(1); } while(1);
/* We hold the queue's lock, and we've seletect a reasonable queue for this /* We hold the queue's lock, and we've seletect a reasonable queue for this
@ -265,6 +274,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
unsigned int i; unsigned int i;
unsigned int num_queues = async_nif->num_queues; unsigned int num_queues = async_nif->num_queues;
struct async_nif_work_queue *q = NULL; struct async_nif_work_queue *q = NULL;
__UNUSED(env);
/* Signal the worker threads, stop what you're doing and exit. To /* Signal the worker threads, stop what you're doing and exit. To
ensure that we don't race with the enqueue() process we first ensure that we don't race with the enqueue() process we first
@ -401,6 +411,7 @@ async_nif_load()
static void static void
async_nif_upgrade(ErlNifEnv *env) async_nif_upgrade(ErlNifEnv *env)
{ {
__UNUSED(env);
// TODO: // TODO:
} }

View file

@ -35,6 +35,10 @@
# define dprint(s, ...) {} # define dprint(s, ...) {}
#endif #endif
#ifndef __UNUSED
#define __UNUSED(v) ((void)(v))
#endif
#include "wiredtiger.h" #include "wiredtiger.h"
#include "async_nif.h" #include "async_nif.h"
#include "khash.h" #include "khash.h"
@ -89,6 +93,17 @@ static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_NOT_FOUND; static ERL_NIF_TERM ATOM_NOT_FOUND;
static ERL_NIF_TERM ATOM_FIRST; static ERL_NIF_TERM ATOM_FIRST;
static ERL_NIF_TERM ATOM_LAST; static ERL_NIF_TERM ATOM_LAST;
static ERL_NIF_TERM ATOM_MESSAGE;
static ERL_NIF_TERM ATOM_PROGRESS;
static ERL_NIF_TERM ATOM_WTERL_VSN;
static ERL_NIF_TERM ATOM_WIREDTIGER_VSN;
static ERL_NIF_TERM ATOM_MSG_PID;
struct wterl_event_handlers {
WT_EVENT_HANDLER handlers;
ErlNifEnv *msg_env;
ErlNifPid to_pid;
};
/* Generators for 'conns' a named, type-specific hash table functions. */ /* Generators for 'conns' a named, type-specific hash table functions. */
KHASH_MAP_INIT_PTR(conns, WterlConnHandle*); KHASH_MAP_INIT_PTR(conns, WterlConnHandle*);
@ -97,12 +112,115 @@ struct wterl_priv_data {
void *async_nif_priv; // Note: must be first element in struct void *async_nif_priv; // Note: must be first element in struct
ErlNifMutex *conns_mutex; ErlNifMutex *conns_mutex;
khash_t(conns) *conns; khash_t(conns) *conns;
struct wterl_event_handlers eh;
char wterl_vsn[512];
char wiredtiger_vsn[512];
}; };
/* Global init for async_nif. */ /* Global init for async_nif. */
ASYNC_NIF_INIT(wterl); ASYNC_NIF_INIT(wterl);
/**
* Callback to handle error messages.
*
* Deliver error messages into Erlang to be logged via loger:error()
* or on failure, write message to stderr.
*
* error a WiredTiger, C99 or POSIX error code, which can
* be converted to a string using wiredtiger_strerror()
* message an error string
* -> 0 on success, a non-zero return may cause the WiredTiger
* function posting the event to fail, and may even cause
* operation or library failure.
*/
int
__wterl_error_handler(WT_EVENT_HANDLER *handler, int error, const char *message)
{
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
ErlNifEnv *msg_env = eh->msg_env;
ErlNifPid *to_pid = &eh->to_pid;
if (msg_env && to_pid) {
ERL_NIF_TERM msg =
enif_make_tuple2(msg_env, ATOM_ERROR,
enif_make_tuple2(msg_env,
enif_make_atom(msg_env, erl_errno_id(error)),
enif_make_string(msg_env, message, ERL_NIF_LATIN1)));
enif_clear_env(msg_env);
if (!enif_send(NULL, to_pid, msg_env, msg))
fprintf(stderr, "[%d] %s\n", error, message);
return 0;
} else {
// output to stderr
return (fprintf(stderr, "[%d] %s\n", error, message) >= 0 ? 0 : EIO);
}
}
/**
* Callback to handle informational messages.
*
* Deliver informational messages into Erlang to be logged via loger:info()
* or on failure, write message to stdout.
*
* message an informational string
* -> 0 on success, a non-zero return may cause the WiredTiger
* function posting the event to fail, and may even cause
* operation or library failure.
*/
int
__wterl_message_handler(WT_EVENT_HANDLER *handler, const char *message)
{
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
ErlNifEnv *msg_env = eh->msg_env;
ErlNifPid *to_pid = &eh->to_pid;
if (msg_env && to_pid) {
ERL_NIF_TERM msg =
enif_make_tuple2(msg_env, ATOM_MESSAGE,
enif_make_string(msg_env, message, ERL_NIF_LATIN1));
enif_clear_env(msg_env);
if (!enif_send(NULL, to_pid, msg_env, msg))
fprintf(stderr, "%s\n", message);
return 0;
} else {
// output to stdout
return (printf("%s\n", message) >= 0 ? 0 : EIO);
}
}
/**
* Callback to handle progress messages.
*
* Deliver progress messages into Erlang to be logged via loger:info()
* or on failure, written message to stdout.
*
* operation a string representation of the operation
* counter a progress counter [0..100]
* -> 0 on success, a non-zero return may cause the WiredTiger
* function posting the event to fail, and may even cause
* operation or library failure.
*/
int
__wterl_progress_handler(WT_EVENT_HANDLER *handler, const char *operation, uint64_t counter)
{
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
ErlNifEnv *msg_env = eh->msg_env;
ErlNifPid *to_pid = &eh->to_pid;
if (msg_env && to_pid) {
ERL_NIF_TERM msg =
enif_make_tuple2(msg_env, ATOM_PROGRESS,
enif_make_tuple2(msg_env,
enif_make_string(msg_env, operation, ERL_NIF_LATIN1),
enif_make_int64(msg_env, counter)));
enif_clear_env(msg_env);
if (!enif_send(NULL, to_pid, msg_env, msg))
fprintf(stderr, "[%ld] %s\n", counter, operation);
return 0;
} else {
// output to stdout
return (printf("[%ld] %s\n", counter, operation) >= 0 ? 0 : EIO);
}
}
/** /**
* Open a WT_SESSION for the thread context 'ctx' to use, also init the * Open a WT_SESSION for the thread context 'ctx' to use, also init the
* shared cursor hash table. * shared cursor hash table.
@ -122,7 +240,7 @@ __init_session_and_cursor_cache(WterlConnHandle *conn_handle, WterlCtx *ctx)
ctx->cursors = kh_init(cursors); ctx->cursors = kh_init(cursors);
if (!ctx->cursors) { if (!ctx->cursors) {
ctx->session->close(ctx->session); ctx->session->close(ctx->session, NULL);
ctx->session = NULL; ctx->session = NULL;
return ENOMEM; return ENOMEM;
} }
@ -262,6 +380,9 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char
static void static void
__release_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char *uri, WT_CURSOR *cursor) __release_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char *uri, WT_CURSOR *cursor)
{ {
__UNUSED(conn_handle);
__UNUSED(worker_id);
__UNUSED(uri);
cursor->reset(cursor); cursor->reset(cursor);
} }
@ -333,7 +454,11 @@ ASYNC_NIF_DECL(
ASYNC_NIF_REPLY(enif_make_badarg(env)); ASYNC_NIF_REPLY(enif_make_badarg(env));
return; return;
} }
int rc = wiredtiger_open(args->homedir, NULL, config.data[0] != 0 ? (const char*)config.data : NULL, &conn);
int rc = wiredtiger_open(args->homedir,
(WT_EVENT_HANDLER*)&args->priv->eh.handlers,
config.data[0] != 0 ? (const char*)config.data : NULL,
&conn);
if (rc == 0) { if (rc == 0) {
WterlConnHandle *conn_handle = enif_alloc_resource(wterl_conn_RESOURCE, sizeof(WterlConnHandle)); WterlConnHandle *conn_handle = enif_alloc_resource(wterl_conn_RESOURCE, sizeof(WterlConnHandle));
if (!conn_handle) { if (!conn_handle) {
@ -1892,6 +2017,9 @@ ASYNC_NIF_DECL(
static int static int
on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
{ {
int arity;
ERL_NIF_TERM head, tail;
const ERL_NIF_TERM* option;
ErlNifResourceFlags flags = ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER; ErlNifResourceFlags flags = ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER;
wterl_conn_RESOURCE = enif_open_resource_type(env, NULL, "wterl_conn_resource", wterl_conn_RESOURCE = enif_open_resource_type(env, NULL, "wterl_conn_resource",
NULL, flags, NULL); NULL, flags, NULL);
@ -1903,25 +2031,82 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
ATOM_NOT_FOUND = enif_make_atom(env, "not_found"); ATOM_NOT_FOUND = enif_make_atom(env, "not_found");
ATOM_FIRST = enif_make_atom(env, "first"); ATOM_FIRST = enif_make_atom(env, "first");
ATOM_LAST = enif_make_atom(env, "last"); ATOM_LAST = enif_make_atom(env, "last");
ATOM_MESSAGE = enif_make_atom(env, "message");
ATOM_PROGRESS = enif_make_atom(env, "progress");
ATOM_WTERL_VSN = enif_make_atom(env, "wterl_vsn");
ATOM_WIREDTIGER_VSN = enif_make_atom(env, "wiredtiger_vsn");
ATOM_MSG_PID = enif_make_atom(env, "message_pid");
struct wterl_priv_data *priv = enif_alloc(sizeof(struct wterl_priv_data)); struct wterl_priv_data *priv = enif_alloc(sizeof(struct wterl_priv_data));
if (!priv) if (!priv)
return ENOMEM; return ENOMEM;
memset(priv, 0, sizeof(struct wterl_priv_data)); memset(priv, 0, sizeof(struct wterl_priv_data));
priv->conns_mutex = enif_mutex_create(NULL);
priv->conns = kh_init(conns);
if (!priv->conns) {
enif_mutex_destroy(priv->conns_mutex);
enif_free(priv);
return ENOMEM;
}
struct wterl_event_handlers *eh = &priv->eh;
eh->msg_env = enif_alloc_env();
if (!eh->msg_env) {
kh_destroy(conns, priv->conns);
enif_mutex_destroy(priv->conns_mutex);
enif_free(priv);
return ENOMEM;
}
eh->handlers.handle_error = __wterl_error_handler;
eh->handlers.handle_message = __wterl_message_handler;
eh->handlers.handle_progress = __wterl_progress_handler;
/* Process the load_info array of tuples, we expect:
[{wterl_vsn, "a version string"},
{wiredtiger_vsn, "a version string"},
{message_pid, ERL_NIF_TERM pid}].
*/
while (enif_get_list_cell(env, load_info, &head, &tail)) {
if (enif_get_tuple(env, head, &arity, &option)) {
if (arity == 2) {
if (enif_is_identical(option[0], ATOM_WTERL_VSN)) {
enif_get_string(env, option[1], priv->wterl_vsn, sizeof(priv->wterl_vsn), ERL_NIF_LATIN1);
} else if (enif_is_identical(option[0], ATOM_WIREDTIGER_VSN)) {
enif_get_string(env, option[1], priv->wiredtiger_vsn, sizeof(priv->wiredtiger_vsn), ERL_NIF_LATIN1);
} else if (enif_is_identical(option[0], ATOM_MSG_PID)) {
if (!enif_get_local_pid(env, option[1], &priv->eh.to_pid))
return -1;
}
}
}
load_info = tail;
}
/* Note: !!! the first element of our priv_data struct *must* be the /* Note: !!! the first element of our priv_data struct *must* be the
pointer to the async_nif's private data which we set here. */ pointer to the async_nif's private data which we set here. */
ASYNC_NIF_LOAD(wterl, priv->async_nif_priv); ASYNC_NIF_LOAD(wterl, priv->async_nif_priv);
if (!priv->async_nif_priv) {
priv->conns_mutex = enif_mutex_create(NULL); enif_free_env(eh->msg_env);
priv->conns = kh_init(conns); kh_destroy(conns, priv->conns);
enif_mutex_destroy(priv->conns_mutex);
enif_free(priv);
return ENOMEM;
}
*priv_data = priv; *priv_data = priv;
return *priv_data ? 0 : ENOMEM;
char msg[1024];
snprintf(msg, 1024, "NIF on_load complete (wterl version: %s, wiredtiger version: %s)", priv->wterl_vsn, priv->wiredtiger_vsn);
__wterl_message_handler((WT_EVENT_HANDLER *)&priv->eh, msg);
return 0;
} }
static int static int
on_reload(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) on_reload(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
{ {
__UNUSED(env);
__UNUSED(priv_data);
__UNUSED(load_info);
return 0; // TODO: implement return 0; // TODO: implement
} }
@ -1985,6 +2170,10 @@ on_unload(ErlNifEnv *env, void *priv_data)
} }
} }
/* At this point all WiredTiger state and threads are free'd/stopped so there
is no chance that the event handler functions will be called so we can
be sure that there won't be a race on eh.msg_env in the callback functions. */
enif_free_env(priv->eh.msg_env);
kh_destroy(conns, h); kh_destroy(conns, h);
enif_mutex_unlock(priv->conns_mutex); enif_mutex_unlock(priv->conns_mutex);
enif_mutex_destroy(priv->conns_mutex); enif_mutex_destroy(priv->conns_mutex);
@ -1994,6 +2183,9 @@ on_unload(ErlNifEnv *env, void *priv_data)
static int static int
on_upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data, ERL_NIF_TERM load_info) on_upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data, ERL_NIF_TERM load_info)
{ {
__UNUSED(priv_data);
__UNUSED(old_priv_data);
__UNUSED(load_info);
ASYNC_NIF_UPGRADE(wterl, env); // TODO: implement ASYNC_NIF_UPGRADE(wterl, env); // TODO: implement
return 0; return 0;
} }

View file

@ -417,9 +417,9 @@ establish_connection(Config, Type) ->
wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)), wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)),
wterl:config_value(statistics_log, Config, [{wait, 30}]), % sec wterl:config_value(statistics_log, Config, [{wait, 30}]), % sec
wterl:config_value(verbose, Config, [ wterl:config_value(verbose, Config, [
%"ckpt" "block", "shared_cache", "evictserver", "fileops", "ckpt" "block", "shared_cache", "evictserver", "fileops",
%"hazard", "mutex", "read", "readserver", "reconcile", "hazard", "mutex", "read", "readserver", "reconcile",
%"salvage", "verify", "write", "evict", "lsm" "salvage", "verify", "write", "evict", "lsm"
]) ] ++ CheckpointSetting ++ proplists:get_value(wterl, Config, [])), % sec ]) ] ++ CheckpointSetting ++ proplists:get_value(wterl, Config, [])), % sec

View file

@ -94,9 +94,11 @@ nif_stub_error(Line) ->
-spec init() -> ok | {error, any()}. -spec init() -> ok | {error, any()}.
init() -> init() ->
MsgPid = wterl_event_handler:start(),
erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]), erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]),
[{wterl, "07061ed6e8252543c2f06b81a646eca6945cc558"}, [{wterl_vsn, "a1459ce"},
{wiredtiger, "6f7a4b961c744bfb21f0c21d4c28c2d162400f1b"}]). {wiredtiger_vsn, "1.5.2-2-g8f2685b"},
{message_pid, MsgPid}]).
-spec connection_open(string(), config_list()) -> {ok, connection()} | {error, term()}. -spec connection_open(string(), config_list()) -> {ok, connection()} | {error, term()}.
-spec connection_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}. -spec connection_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}.
@ -862,7 +864,7 @@ prop_put_delete() ->
DataDir = "test/wterl.putdelete.qc", DataDir = "test/wterl.putdelete.qc",
Table = "table:eqc", Table = "table:eqc",
{ok, CWD} = file:get_cwd(), {ok, CWD} = file:get_cwd(),
?cmd("rm -rf "++DataDir), rmdir(filename:join([CWD, DataDir])), % ?cmd("rm -rf " ++ filename:join([CWD, DataDir])),
ok = filelib:ensure_dir(filename:join([DataDir, "x"])), ok = filelib:ensure_dir(filename:join([DataDir, "x"])),
{ok, ConnRef} = wterl:connection_open(DataDir, [{create,true}]), {ok, ConnRef} = wterl:connection_open(DataDir, [{create,true}]),
try try