From ef3bc102f2c9af0e6870beab7cae364dd64c226a Mon Sep 17 00:00:00 2001 From: Gregory Burd Date: Fri, 2 Aug 2013 14:20:04 -0400 Subject: [PATCH] Replace all enif implementations of mutexes and conditions with their POSIX pthread equivalent on the theory that we don't want to be bumping heads with the Erlang runtime. --- c_src/wterl.c | 145 +++++++++++++++++++++++++------------------------- 1 file changed, 73 insertions(+), 72 deletions(-) diff --git a/c_src/wterl.c b/c_src/wterl.c index e2f4ffa..9131a8f 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -25,6 +25,7 @@ #include #include #include +#include #include "wiredtiger.h" @@ -57,7 +58,7 @@ typedef struct wterl_conn { WT_CONNECTION *conn; const char *session_config; STAILQ_HEAD(ctxs, wterl_ctx) cache; - ErlNifMutex *cache_mutex; + pthread_mutex_t cache_mutex; uint32_t cache_size; } WterlConnHandle; @@ -69,11 +70,11 @@ typedef struct { struct wterl_event_handlers { WT_EVENT_HANDLER handlers; ErlNifEnv *msg_env_error; - ErlNifMutex *error_mutex; + pthread_mutex_t error_mutex; ErlNifEnv *msg_env_message; - ErlNifMutex *message_mutex; + pthread_mutex_t message_mutex; ErlNifEnv *msg_env_progress; - ErlNifMutex *progress_mutex; + pthread_mutex_t progress_mutex; ErlNifPid to_pid; }; @@ -204,7 +205,7 @@ __ctx_cache_evict(WterlConnHandle *conn_handle) STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries); if (c->session) c->session->close(c->session, NULL); - enif_free(c); + free(c); num_evicted++; } } @@ -226,7 +227,7 @@ __ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig) { struct wterl_ctx *c; - enif_mutex_lock(conn_handle->cache_mutex); + pthread_mutex_lock(&conn_handle->cache_mutex); c = STAILQ_FIRST(&conn_handle->cache); while (c != NULL) { if (c->sig == sig) { // TODO: hash collisions *will* lead to SEGVs @@ -237,7 +238,7 @@ __ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig) } c = STAILQ_NEXT(c, entries); } - enif_mutex_unlock(conn_handle->cache_mutex); + pthread_mutex_unlock(&conn_handle->cache_mutex); DPRINTF("cache_find: [%u] %s (%p)", conn_handle->cache_size, c ? "hit" : "miss", c); return c; } @@ -251,7 +252,7 @@ __ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig) static void __ctx_cache_add(WterlConnHandle *conn_handle, struct wterl_ctx *c) { - enif_mutex_lock(conn_handle->cache_mutex); + pthread_mutex_lock(&conn_handle->cache_mutex); __ctx_cache_evict(conn_handle); STAILQ_INSERT_TAIL(&conn_handle->cache, c, entries); conn_handle->cache_size += 1; @@ -262,7 +263,7 @@ __ctx_cache_add(WterlConnHandle *conn_handle, struct wterl_ctx *c) sz++; } #endif - enif_mutex_unlock(conn_handle->cache_mutex); + pthread_mutex_unlock(&conn_handle->cache_mutex); DPRINTF("cache_add: [%u:%u] (%p)", sz, conn_handle->cache_size, c); } @@ -333,7 +334,7 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, int rc = conn->open_session(conn, NULL, session_config, &session); if (rc != 0) return rc; size_t s = sizeof(struct wterl_ctx) + (count * sizeof(struct cursor_info)) + sig_len; - c = enif_alloc(s); // TODO: enif_alloc_resource() + c = malloc(s); // TODO: enif_alloc_resource() if (c == NULL) { session->close(session, NULL); return ENOMEM; @@ -355,7 +356,7 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, c->ci[i].config = __copy_str_into(&p, config); rc = session->open_cursor(session, uri, NULL, config, &c->ci[i].cursor); if (rc != 0) { - enif_free(c); + free(c); session->close(session, NULL); // this will free the cursors too va_end(ap); return rc; @@ -391,7 +392,7 @@ __release_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx /** * Close all sessions and all cursors open on any objects. * - * Note: always call within enif_mutex_lock/unlock(conn_handle->cache_mutex) + * Note: always call within pthread_mutex_lock/unlock(conn_handle->cache_mutex) */ void __close_all_sessions(WterlConnHandle *conn_handle) @@ -405,7 +406,7 @@ __close_all_sessions(WterlConnHandle *conn_handle) STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries); conn_handle->cache_size -= 1; c->session->close(c->session, NULL); - enif_free(c); + free(c); c = n; } } @@ -413,7 +414,7 @@ __close_all_sessions(WterlConnHandle *conn_handle) /** * Close cursors open on 'uri' object. * - * Note: always call within enif_mutex_lock/unlock(conn_handle->cache_mutex) + * Note: always call within pthread_mutex_lock/unlock(conn_handle->cache_mutex) */ void __close_cursors_on(WterlConnHandle *conn_handle, const char *uri) @@ -431,7 +432,7 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri) STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries); conn_handle->cache_size -= 1; c->session->close(c->session, NULL); - enif_free(c); + free(c); break; } } @@ -461,7 +462,7 @@ __wterl_error_handler(WT_EVENT_HANDLER *handler, int error, const char *message) ErlNifPid *to_pid; int rc = 0; - enif_mutex_lock(eh->error_mutex); + pthread_mutex_lock(&eh->error_mutex); msg_env = eh->msg_env_error; to_pid = &eh->to_pid; if (msg_env) { @@ -476,7 +477,7 @@ __wterl_error_handler(WT_EVENT_HANDLER *handler, int error, const char *message) } else { rc = (fprintf(stderr, "[%d] %s\n", error, message) >= 0 ? 0 : EIO); } - enif_mutex_unlock(eh->error_mutex); + pthread_mutex_unlock(&eh->error_mutex); return rc; } @@ -499,7 +500,7 @@ __wterl_message_handler(WT_EVENT_HANDLER *handler, const char *message) ErlNifPid *to_pid; int rc = 0; - enif_mutex_lock(eh->message_mutex); + pthread_mutex_lock(&eh->message_mutex); msg_env = eh->msg_env_message; to_pid = &eh->to_pid; if (msg_env) { @@ -512,7 +513,7 @@ __wterl_message_handler(WT_EVENT_HANDLER *handler, const char *message) } else { rc = (printf("%s\n", message) >= 0 ? 0 : EIO); } - enif_mutex_unlock(eh->message_mutex); + pthread_mutex_unlock(&eh->message_mutex); return rc; } @@ -536,7 +537,7 @@ __wterl_progress_handler(WT_EVENT_HANDLER *handler, const char *operation, uint6 ErlNifPid *to_pid; int rc = 0; - enif_mutex_lock(eh->progress_mutex); + pthread_mutex_lock(&eh->progress_mutex); msg_env = eh->msg_env_progress; to_pid = &eh->to_pid; if (msg_env) { @@ -551,7 +552,7 @@ __wterl_progress_handler(WT_EVENT_HANDLER *handler, const char *operation, uint6 } else { rc = (printf("[%llu] %s\n", PRIuint64(counter), operation) >= 0 ? 0 : EIO); } - enif_mutex_unlock(eh->progress_mutex); + pthread_mutex_unlock(&eh->progress_mutex); return rc; } @@ -637,7 +638,7 @@ ASYNC_NIF_DECL( return; } if (session_config.size > 1) { - char *sc = enif_alloc(session_config.size); + char *sc = malloc(session_config.size); if (!sc) { enif_release_resource(conn_handle); ASYNC_NIF_REPLY(__strerror_term(env, ENOMEM)); @@ -648,8 +649,8 @@ ASYNC_NIF_DECL( } else { conn_handle->session_config = NULL; } - conn_handle->cache_mutex = enif_mutex_create("conn_handle"); - enif_mutex_lock(conn_handle->cache_mutex); + pthread_mutex_init(&conn_handle->cache_mutex, 0); + pthread_mutex_lock(&conn_handle->cache_mutex); conn_handle->conn = conn; ERL_NIF_TERM result = enif_make_resource(env, conn_handle); @@ -658,7 +659,7 @@ ASYNC_NIF_DECL( conn_handle->cache_size = 0; enif_release_resource(conn_handle); - enif_mutex_unlock(conn_handle->cache_mutex); + pthread_mutex_unlock(&conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result)); } else @@ -695,16 +696,16 @@ ASYNC_NIF_DECL( { // work /* Free up the shared sessions and cursors. */ - enif_mutex_lock(args->conn_handle->cache_mutex); + pthread_mutex_lock(&args->conn_handle->cache_mutex); __close_all_sessions(args->conn_handle); if (args->conn_handle->session_config) { - enif_free((char *)args->conn_handle->session_config); + free((char *)args->conn_handle->session_config); args->conn_handle->session_config = NULL; } WT_CONNECTION* conn = args->conn_handle->conn; int rc = conn->close(conn, NULL); - enif_mutex_unlock(args->conn_handle->cache_mutex); - enif_mutex_destroy(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); + pthread_mutex_destroy(&args->conn_handle->cache_mutex); memset(args->conn_handle, 0, sizeof(WterlConnHandle)); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); @@ -800,12 +801,12 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->cache_mutex); + pthread_mutex_lock(&args->conn_handle->cache_mutex); __close_cursors_on(args->conn_handle, args->uri); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -817,7 +818,7 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -827,7 +828,7 @@ ASYNC_NIF_DECL( this will result in EBUSY(16) "Device or resource busy". */ rc = session->drop(session, args->uri, (const char*)config.data); (void)session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -867,12 +868,12 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->cache_mutex); + pthread_mutex_lock(&args->conn_handle->cache_mutex); __close_cursors_on(args->conn_handle, args->oldname); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -884,7 +885,7 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -895,7 +896,7 @@ ASYNC_NIF_DECL( this will result in EBUSY(16) "Device or resource busy". */ rc = session->rename(session, args->oldname, args->newname, (const char*)config.data); (void)session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -935,12 +936,12 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->cache_mutex); + pthread_mutex_lock(&args->conn_handle->cache_mutex); __close_cursors_on(args->conn_handle, args->uri); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -952,14 +953,14 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } rc = session->salvage(session, args->uri, (const char*)config.data); (void)session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -1067,12 +1068,12 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->cache_mutex); + pthread_mutex_lock(&args->conn_handle->cache_mutex); __close_cursors_on(args->conn_handle, args->uri); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -1085,7 +1086,7 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -1100,7 +1101,7 @@ ASYNC_NIF_DECL( mess. */ if (!args->from_first) { if (!enif_inspect_binary(env, args->start, &start_key)) { - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -1108,7 +1109,7 @@ ASYNC_NIF_DECL( rc = session->open_cursor(session, args->uri, NULL, "raw", &start); if (rc != 0) { session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -1118,7 +1119,7 @@ ASYNC_NIF_DECL( if (rc != 0) { start->close(start); session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -1133,7 +1134,7 @@ ASYNC_NIF_DECL( if (!enif_inspect_binary(env, args->stop, &stop_key)) { start->close(start); session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -1142,7 +1143,7 @@ ASYNC_NIF_DECL( if (rc != 0) { start->close(start); session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -1153,7 +1154,7 @@ ASYNC_NIF_DECL( start->close(start); stop->close(stop); session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -1171,7 +1172,7 @@ ASYNC_NIF_DECL( start->close(start); stop->close(stop); session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -1208,12 +1209,12 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->cache_mutex); + pthread_mutex_lock(&args->conn_handle->cache_mutex); __close_cursors_on(args->conn_handle, args->uri); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -1225,14 +1226,14 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } rc = session->upgrade(session, args->uri, (const char*)config.data); (void)session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -1270,12 +1271,12 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->cache_mutex); + pthread_mutex_lock(&args->conn_handle->cache_mutex); __close_all_sessions(args->conn_handle); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -1287,14 +1288,14 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } rc = session->verify(session, args->uri, (const char*)config.data); (void)session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->cache_mutex); + pthread_mutex_unlock(&args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -2220,13 +2221,13 @@ static void __wterl_conn_dtor(ErlNifEnv* env, void* obj) UNUSED(env); WterlConnHandle *conn_handle = (WterlConnHandle *)obj; - if (conn_handle->cache_mutex) { + if (conn_handle->conn) { DPRINTF("conn_handle dtor free'ing (%p)", obj); - enif_mutex_lock(conn_handle->cache_mutex); + pthread_mutex_lock(&conn_handle->cache_mutex); __close_all_sessions(conn_handle); conn_handle->conn->close(conn_handle->conn, NULL); - enif_mutex_unlock(conn_handle->cache_mutex); - enif_mutex_destroy(conn_handle->cache_mutex); + pthread_mutex_unlock(&conn_handle->cache_mutex); + pthread_mutex_destroy(&conn_handle->cache_mutex); } } @@ -2265,15 +2266,15 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) ATOM_WIREDTIGER_VSN = enif_make_atom(env, "wiredtiger_vsn"); ATOM_MSG_PID = enif_make_atom(env, "message_pid"); - struct wterl_priv_data *priv = enif_alloc(sizeof(struct wterl_priv_data)); + struct wterl_priv_data *priv = malloc(sizeof(struct wterl_priv_data)); if (!priv) return ENOMEM; memset(priv, 0, sizeof(struct wterl_priv_data)); struct wterl_event_handlers *eh = &priv->eh; - eh->error_mutex = enif_mutex_create("error_mutex"); - eh->message_mutex = enif_mutex_create("message_mutex"); - eh->progress_mutex = enif_mutex_create("progress_mutex"); + pthread_mutex_init(&eh->error_mutex, 0); + pthread_mutex_init(&eh->message_mutex, 0); + pthread_mutex_init(&eh->progress_mutex, 0); /* Process the load_info array of tuples, we expect: [{wterl_vsn, "a version string"}, @@ -2296,7 +2297,7 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) ASYNC_NIF_LOAD(wterl, priv->async_nif_priv); if (!priv->async_nif_priv) { memset(priv, 0, sizeof(struct wterl_priv_data)); - enif_free(priv); + free(priv); return ENOMEM; } *priv_data = priv; @@ -2331,9 +2332,9 @@ on_unload(ErlNifEnv *env, void *priv_data) is no chance that the event handler functions will be called so we can be sure that there won't be a race on eh.msg_env in the callback functions. */ struct wterl_event_handlers *eh = &priv->eh; - enif_mutex_destroy(eh->error_mutex); - enif_mutex_destroy(eh->message_mutex); - enif_mutex_destroy(eh->progress_mutex); + pthread_mutex_destroy(&eh->error_mutex); + pthread_mutex_destroy(&eh->message_mutex); + pthread_mutex_destroy(&eh->progress_mutex); if (eh->msg_env_message) enif_free_env(eh->msg_env_message); if (eh->msg_env_error) @@ -2342,7 +2343,7 @@ on_unload(ErlNifEnv *env, void *priv_data) enif_free_env(eh->msg_env_progress); memset(priv, 0, sizeof(struct wterl_priv_data)); - enif_free(priv); + free(priv); priv_data = NULL; }