Replace all enif implementations of mutexes and conditions with their POSIX pthread equivalent on the theory that we don't want to be bumping heads with the Erlang runtime.

This commit is contained in:
Gregory Burd 2013-08-02 14:20:04 -04:00
parent e9b1a9ea0b
commit ef3bc102f2

View file

@ -25,6 +25,7 @@
#include <stdarg.h>
#include <inttypes.h>
#include <errno.h>
#include <pthread.h>
#include "wiredtiger.h"
@ -57,7 +58,7 @@ typedef struct wterl_conn {
WT_CONNECTION *conn;
const char *session_config;
STAILQ_HEAD(ctxs, wterl_ctx) cache;
ErlNifMutex *cache_mutex;
pthread_mutex_t cache_mutex;
uint32_t cache_size;
} WterlConnHandle;
@ -69,11 +70,11 @@ typedef struct {
struct wterl_event_handlers {
WT_EVENT_HANDLER handlers;
ErlNifEnv *msg_env_error;
ErlNifMutex *error_mutex;
pthread_mutex_t error_mutex;
ErlNifEnv *msg_env_message;
ErlNifMutex *message_mutex;
pthread_mutex_t message_mutex;
ErlNifEnv *msg_env_progress;
ErlNifMutex *progress_mutex;
pthread_mutex_t progress_mutex;
ErlNifPid to_pid;
};
@ -204,7 +205,7 @@ __ctx_cache_evict(WterlConnHandle *conn_handle)
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
if (c->session)
c->session->close(c->session, NULL);
enif_free(c);
free(c);
num_evicted++;
}
}
@ -226,7 +227,7 @@ __ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig)
{
struct wterl_ctx *c;
enif_mutex_lock(conn_handle->cache_mutex);
pthread_mutex_lock(&conn_handle->cache_mutex);
c = STAILQ_FIRST(&conn_handle->cache);
while (c != NULL) {
if (c->sig == sig) { // TODO: hash collisions *will* lead to SEGVs
@ -237,7 +238,7 @@ __ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig)
}
c = STAILQ_NEXT(c, entries);
}
enif_mutex_unlock(conn_handle->cache_mutex);
pthread_mutex_unlock(&conn_handle->cache_mutex);
DPRINTF("cache_find: [%u] %s (%p)", conn_handle->cache_size, c ? "hit" : "miss", c);
return c;
}
@ -251,7 +252,7 @@ __ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig)
static void
__ctx_cache_add(WterlConnHandle *conn_handle, struct wterl_ctx *c)
{
enif_mutex_lock(conn_handle->cache_mutex);
pthread_mutex_lock(&conn_handle->cache_mutex);
__ctx_cache_evict(conn_handle);
STAILQ_INSERT_TAIL(&conn_handle->cache, c, entries);
conn_handle->cache_size += 1;
@ -262,7 +263,7 @@ __ctx_cache_add(WterlConnHandle *conn_handle, struct wterl_ctx *c)
sz++;
}
#endif
enif_mutex_unlock(conn_handle->cache_mutex);
pthread_mutex_unlock(&conn_handle->cache_mutex);
DPRINTF("cache_add: [%u:%u] (%p)", sz, conn_handle->cache_size, c);
}
@ -333,7 +334,7 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
int rc = conn->open_session(conn, NULL, session_config, &session);
if (rc != 0) return rc;
size_t s = sizeof(struct wterl_ctx) + (count * sizeof(struct cursor_info)) + sig_len;
c = enif_alloc(s); // TODO: enif_alloc_resource()
c = malloc(s); // TODO: enif_alloc_resource()
if (c == NULL) {
session->close(session, NULL);
return ENOMEM;
@ -355,7 +356,7 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
c->ci[i].config = __copy_str_into(&p, config);
rc = session->open_cursor(session, uri, NULL, config, &c->ci[i].cursor);
if (rc != 0) {
enif_free(c);
free(c);
session->close(session, NULL); // this will free the cursors too
va_end(ap);
return rc;
@ -391,7 +392,7 @@ __release_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx
/**
* Close all sessions and all cursors open on any objects.
*
* Note: always call within enif_mutex_lock/unlock(conn_handle->cache_mutex)
* Note: always call within pthread_mutex_lock/unlock(conn_handle->cache_mutex)
*/
void
__close_all_sessions(WterlConnHandle *conn_handle)
@ -405,7 +406,7 @@ __close_all_sessions(WterlConnHandle *conn_handle)
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
conn_handle->cache_size -= 1;
c->session->close(c->session, NULL);
enif_free(c);
free(c);
c = n;
}
}
@ -413,7 +414,7 @@ __close_all_sessions(WterlConnHandle *conn_handle)
/**
* Close cursors open on 'uri' object.
*
* Note: always call within enif_mutex_lock/unlock(conn_handle->cache_mutex)
* Note: always call within pthread_mutex_lock/unlock(conn_handle->cache_mutex)
*/
void
__close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
@ -431,7 +432,7 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
conn_handle->cache_size -= 1;
c->session->close(c->session, NULL);
enif_free(c);
free(c);
break;
}
}
@ -461,7 +462,7 @@ __wterl_error_handler(WT_EVENT_HANDLER *handler, int error, const char *message)
ErlNifPid *to_pid;
int rc = 0;
enif_mutex_lock(eh->error_mutex);
pthread_mutex_lock(&eh->error_mutex);
msg_env = eh->msg_env_error;
to_pid = &eh->to_pid;
if (msg_env) {
@ -476,7 +477,7 @@ __wterl_error_handler(WT_EVENT_HANDLER *handler, int error, const char *message)
} else {
rc = (fprintf(stderr, "[%d] %s\n", error, message) >= 0 ? 0 : EIO);
}
enif_mutex_unlock(eh->error_mutex);
pthread_mutex_unlock(&eh->error_mutex);
return rc;
}
@ -499,7 +500,7 @@ __wterl_message_handler(WT_EVENT_HANDLER *handler, const char *message)
ErlNifPid *to_pid;
int rc = 0;
enif_mutex_lock(eh->message_mutex);
pthread_mutex_lock(&eh->message_mutex);
msg_env = eh->msg_env_message;
to_pid = &eh->to_pid;
if (msg_env) {
@ -512,7 +513,7 @@ __wterl_message_handler(WT_EVENT_HANDLER *handler, const char *message)
} else {
rc = (printf("%s\n", message) >= 0 ? 0 : EIO);
}
enif_mutex_unlock(eh->message_mutex);
pthread_mutex_unlock(&eh->message_mutex);
return rc;
}
@ -536,7 +537,7 @@ __wterl_progress_handler(WT_EVENT_HANDLER *handler, const char *operation, uint6
ErlNifPid *to_pid;
int rc = 0;
enif_mutex_lock(eh->progress_mutex);
pthread_mutex_lock(&eh->progress_mutex);
msg_env = eh->msg_env_progress;
to_pid = &eh->to_pid;
if (msg_env) {
@ -551,7 +552,7 @@ __wterl_progress_handler(WT_EVENT_HANDLER *handler, const char *operation, uint6
} else {
rc = (printf("[%llu] %s\n", PRIuint64(counter), operation) >= 0 ? 0 : EIO);
}
enif_mutex_unlock(eh->progress_mutex);
pthread_mutex_unlock(&eh->progress_mutex);
return rc;
}
@ -637,7 +638,7 @@ ASYNC_NIF_DECL(
return;
}
if (session_config.size > 1) {
char *sc = enif_alloc(session_config.size);
char *sc = malloc(session_config.size);
if (!sc) {
enif_release_resource(conn_handle);
ASYNC_NIF_REPLY(__strerror_term(env, ENOMEM));
@ -648,8 +649,8 @@ ASYNC_NIF_DECL(
} else {
conn_handle->session_config = NULL;
}
conn_handle->cache_mutex = enif_mutex_create("conn_handle");
enif_mutex_lock(conn_handle->cache_mutex);
pthread_mutex_init(&conn_handle->cache_mutex, 0);
pthread_mutex_lock(&conn_handle->cache_mutex);
conn_handle->conn = conn;
ERL_NIF_TERM result = enif_make_resource(env, conn_handle);
@ -658,7 +659,7 @@ ASYNC_NIF_DECL(
conn_handle->cache_size = 0;
enif_release_resource(conn_handle);
enif_mutex_unlock(conn_handle->cache_mutex);
pthread_mutex_unlock(&conn_handle->cache_mutex);
ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result));
}
else
@ -695,16 +696,16 @@ ASYNC_NIF_DECL(
{ // work
/* Free up the shared sessions and cursors. */
enif_mutex_lock(args->conn_handle->cache_mutex);
pthread_mutex_lock(&args->conn_handle->cache_mutex);
__close_all_sessions(args->conn_handle);
if (args->conn_handle->session_config) {
enif_free((char *)args->conn_handle->session_config);
free((char *)args->conn_handle->session_config);
args->conn_handle->session_config = NULL;
}
WT_CONNECTION* conn = args->conn_handle->conn;
int rc = conn->close(conn, NULL);
enif_mutex_unlock(args->conn_handle->cache_mutex);
enif_mutex_destroy(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
pthread_mutex_destroy(&args->conn_handle->cache_mutex);
memset(args->conn_handle, 0, sizeof(WterlConnHandle));
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
@ -800,12 +801,12 @@ ASYNC_NIF_DECL(
{ // work
/* This call requires that there be no open cursors referencing the object. */
enif_mutex_lock(args->conn_handle->cache_mutex);
pthread_mutex_lock(&args->conn_handle->cache_mutex);
__close_cursors_on(args->conn_handle, args->uri);
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
@ -817,7 +818,7 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) {
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
@ -827,7 +828,7 @@ ASYNC_NIF_DECL(
this will result in EBUSY(16) "Device or resource busy". */
rc = session->drop(session, args->uri, (const char*)config.data);
(void)session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
},
{ // post
@ -867,12 +868,12 @@ ASYNC_NIF_DECL(
{ // work
/* This call requires that there be no open cursors referencing the object. */
enif_mutex_lock(args->conn_handle->cache_mutex);
pthread_mutex_lock(&args->conn_handle->cache_mutex);
__close_cursors_on(args->conn_handle, args->oldname);
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
@ -884,7 +885,7 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) {
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
@ -895,7 +896,7 @@ ASYNC_NIF_DECL(
this will result in EBUSY(16) "Device or resource busy". */
rc = session->rename(session, args->oldname, args->newname, (const char*)config.data);
(void)session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
},
{ // post
@ -935,12 +936,12 @@ ASYNC_NIF_DECL(
{ // work
/* This call requires that there be no open cursors referencing the object. */
enif_mutex_lock(args->conn_handle->cache_mutex);
pthread_mutex_lock(&args->conn_handle->cache_mutex);
__close_cursors_on(args->conn_handle, args->uri);
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
@ -952,14 +953,14 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) {
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
rc = session->salvage(session, args->uri, (const char*)config.data);
(void)session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
},
{ // post
@ -1067,12 +1068,12 @@ ASYNC_NIF_DECL(
{ // work
/* This call requires that there be no open cursors referencing the object. */
enif_mutex_lock(args->conn_handle->cache_mutex);
pthread_mutex_lock(&args->conn_handle->cache_mutex);
__close_cursors_on(args->conn_handle, args->uri);
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
@ -1085,7 +1086,7 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) {
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
@ -1100,7 +1101,7 @@ ASYNC_NIF_DECL(
mess. */
if (!args->from_first) {
if (!enif_inspect_binary(env, args->start, &start_key)) {
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
@ -1108,7 +1109,7 @@ ASYNC_NIF_DECL(
rc = session->open_cursor(session, args->uri, NULL, "raw", &start);
if (rc != 0) {
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
@ -1118,7 +1119,7 @@ ASYNC_NIF_DECL(
if (rc != 0) {
start->close(start);
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
@ -1133,7 +1134,7 @@ ASYNC_NIF_DECL(
if (!enif_inspect_binary(env, args->stop, &stop_key)) {
start->close(start);
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
@ -1142,7 +1143,7 @@ ASYNC_NIF_DECL(
if (rc != 0) {
start->close(start);
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
@ -1153,7 +1154,7 @@ ASYNC_NIF_DECL(
start->close(start);
stop->close(stop);
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
@ -1171,7 +1172,7 @@ ASYNC_NIF_DECL(
start->close(start);
stop->close(stop);
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
},
{ // post
@ -1208,12 +1209,12 @@ ASYNC_NIF_DECL(
{ // work
/* This call requires that there be no open cursors referencing the object. */
enif_mutex_lock(args->conn_handle->cache_mutex);
pthread_mutex_lock(&args->conn_handle->cache_mutex);
__close_cursors_on(args->conn_handle, args->uri);
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
@ -1225,14 +1226,14 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) {
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
rc = session->upgrade(session, args->uri, (const char*)config.data);
(void)session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
},
{ // post
@ -1270,12 +1271,12 @@ ASYNC_NIF_DECL(
{ // work
/* This call requires that there be no open cursors referencing the object. */
enif_mutex_lock(args->conn_handle->cache_mutex);
pthread_mutex_lock(&args->conn_handle->cache_mutex);
__close_all_sessions(args->conn_handle);
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
@ -1287,14 +1288,14 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) {
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
rc = session->verify(session, args->uri, (const char*)config.data);
(void)session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->cache_mutex);
pthread_mutex_unlock(&args->conn_handle->cache_mutex);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
},
{ // post
@ -2220,13 +2221,13 @@ static void __wterl_conn_dtor(ErlNifEnv* env, void* obj)
UNUSED(env);
WterlConnHandle *conn_handle = (WterlConnHandle *)obj;
if (conn_handle->cache_mutex) {
if (conn_handle->conn) {
DPRINTF("conn_handle dtor free'ing (%p)", obj);
enif_mutex_lock(conn_handle->cache_mutex);
pthread_mutex_lock(&conn_handle->cache_mutex);
__close_all_sessions(conn_handle);
conn_handle->conn->close(conn_handle->conn, NULL);
enif_mutex_unlock(conn_handle->cache_mutex);
enif_mutex_destroy(conn_handle->cache_mutex);
pthread_mutex_unlock(&conn_handle->cache_mutex);
pthread_mutex_destroy(&conn_handle->cache_mutex);
}
}
@ -2265,15 +2266,15 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
ATOM_WIREDTIGER_VSN = enif_make_atom(env, "wiredtiger_vsn");
ATOM_MSG_PID = enif_make_atom(env, "message_pid");
struct wterl_priv_data *priv = enif_alloc(sizeof(struct wterl_priv_data));
struct wterl_priv_data *priv = malloc(sizeof(struct wterl_priv_data));
if (!priv)
return ENOMEM;
memset(priv, 0, sizeof(struct wterl_priv_data));
struct wterl_event_handlers *eh = &priv->eh;
eh->error_mutex = enif_mutex_create("error_mutex");
eh->message_mutex = enif_mutex_create("message_mutex");
eh->progress_mutex = enif_mutex_create("progress_mutex");
pthread_mutex_init(&eh->error_mutex, 0);
pthread_mutex_init(&eh->message_mutex, 0);
pthread_mutex_init(&eh->progress_mutex, 0);
/* Process the load_info array of tuples, we expect:
[{wterl_vsn, "a version string"},
@ -2296,7 +2297,7 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
ASYNC_NIF_LOAD(wterl, priv->async_nif_priv);
if (!priv->async_nif_priv) {
memset(priv, 0, sizeof(struct wterl_priv_data));
enif_free(priv);
free(priv);
return ENOMEM;
}
*priv_data = priv;
@ -2331,9 +2332,9 @@ on_unload(ErlNifEnv *env, void *priv_data)
is no chance that the event handler functions will be called so we can
be sure that there won't be a race on eh.msg_env in the callback functions. */
struct wterl_event_handlers *eh = &priv->eh;
enif_mutex_destroy(eh->error_mutex);
enif_mutex_destroy(eh->message_mutex);
enif_mutex_destroy(eh->progress_mutex);
pthread_mutex_destroy(&eh->error_mutex);
pthread_mutex_destroy(&eh->message_mutex);
pthread_mutex_destroy(&eh->progress_mutex);
if (eh->msg_env_message)
enif_free_env(eh->msg_env_message);
if (eh->msg_env_error)
@ -2342,7 +2343,7 @@ on_unload(ErlNifEnv *env, void *priv_data)
enif_free_env(eh->msg_env_progress);
memset(priv, 0, sizeof(struct wterl_priv_data));
enif_free(priv);
free(priv);
priv_data = NULL;
}