Execute NIF calls on non-scheduler threads asynchronously #6

Merged
gburd merged 72 commits from gsb-async-nifs3 into master 2013-04-23 00:54:56 +00:00
Showing only changes of commit 3310129918 - Show all commits

View file

@ -217,8 +217,10 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char
if (ctx->session == NULL) {
enif_mutex_lock(conn_handle->contexts_mutex);
__init_session_and_cursor_cache(conn_handle, ctx); // TODO: check return value
rc = __init_session_and_cursor_cache(conn_handle, ctx);
enif_mutex_unlock(conn_handle->contexts_mutex);
if (rc != 0)
return rc;
}
h = ctx->cursors;
@ -351,6 +353,8 @@ ASYNC_NIF_DECL(
memset(conn_handle->contexts, 0, sizeof(WterlCtx) * ASYNC_NIF_MAX_WORKERS);
ERL_NIF_TERM result = enif_make_resource(env, conn_handle);
/* Keep track of open connections so as to free when unload/reload/etc.
are called. */
khash_t(conns) *h;
enif_mutex_lock(args->priv->conns_mutex);
h = args->priv->conns;
@ -361,8 +365,8 @@ ASYNC_NIF_DECL(
enif_mutex_unlock(args->priv->conns_mutex);
enif_release_resource(conn_handle);
ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result));
enif_mutex_unlock(conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result));
}
else
{
@ -407,6 +411,7 @@ ASYNC_NIF_DECL(
WT_CONNECTION* conn = args->conn_handle->conn;
int rc = conn->close(conn, NULL);
/* Connection is closed, remove it so we don't free on unload/reload/etc. */
khash_t(conns) *h;
enif_mutex_lock(args->priv->conns_mutex);
h = args->priv->conns;
@ -418,8 +423,9 @@ ASYNC_NIF_DECL(
kh_value(h, itr) = NULL;
}
enif_mutex_unlock(args->priv->conns_mutex);
// TODO: dtor? enif_mutex_destroy(args->conn_handle->contexts_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
enif_mutex_destroy(args->conn_handle->contexts_mutex);
memset(args->conn_handle, 0, sizeof(WterlConnHandle));
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
},
@ -519,8 +525,8 @@ ASYNC_NIF_DECL(
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
@ -531,8 +537,8 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
/* Note: we locked the context mutex and called __close_cursors_on()
@ -586,6 +592,7 @@ ASYNC_NIF_DECL(
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
@ -597,8 +604,9 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
/* Note: we locked the context mutex and called __close_cursors_on()
@ -652,6 +660,7 @@ ASYNC_NIF_DECL(
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
@ -663,8 +672,9 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
rc = session->salvage(session, args->uri, (const char*)config.data);
@ -782,8 +792,8 @@ ASYNC_NIF_DECL(
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
@ -795,8 +805,8 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
@ -810,26 +820,26 @@ ASYNC_NIF_DECL(
mess. */
if (!args->from_first) {
if (!enif_inspect_binary(env, args->start, &start_key)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
}
rc = session->open_cursor(session, args->uri, NULL, "raw", &start);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
/* Position the start cursor at the first record or the specified record. */
if (args->from_first) {
rc = start->next(start);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
start->close(start);
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
} else {
@ -839,38 +849,40 @@ ASYNC_NIF_DECL(
start->set_key(start, item_start);
rc = start->search(start);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
start->close(start);
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
}
if (!args->to_last) {
if (!enif_inspect_binary(env, args->stop, &stop_key)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
start->close(start);
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
}
rc = session->open_cursor(session, args->uri, NULL, "raw", &stop);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
start->close(start);
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
/* Position the stop cursor at the last record or the specified record. */
if (args->to_last) {
rc = stop->prev(stop);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
start->close(start);
stop->close(stop);
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
} else {
@ -880,11 +892,11 @@ ASYNC_NIF_DECL(
stop->set_key(stop, item_stop);
rc = stop->search(stop);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
start->close(start);
stop->close(stop);
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
}
@ -893,9 +905,9 @@ ASYNC_NIF_DECL(
start and stop cursors which were opened referencing that URI. */
rc = session->truncate(session, NULL, start, stop, (const char*)config.data);
if (start) start->close(start);
if (stop) stop->close(stop);
if (session) session->close(session, NULL);
start->close(start);
stop->close(stop);
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
},
@ -938,8 +950,8 @@ ASYNC_NIF_DECL(
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
@ -950,8 +962,8 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
@ -1000,8 +1012,8 @@ ASYNC_NIF_DECL(
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
@ -1012,8 +1024,8 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
@ -1211,8 +1223,8 @@ ASYNC_NIF_DECL(
item_value.size = value.size;
cursor->set_value(cursor, &item_value);
rc = cursor->insert(cursor);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
__release_cursor(args->conn_handle, worker_id, args->uri, cursor);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
},
{ // post
@ -1586,15 +1598,16 @@ ASYNC_NIF_DECL(
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
WT_ITEM item_key;
WT_ITEM item_key;
item_key.data = key.data;
item_key.size = key.size;
cursor->set_key(cursor, &item_key);
ASYNC_NIF_REPLY(__cursor_value_ret(env, cursor, cursor->search(cursor)));
ERL_NIF_TERM reply = __cursor_value_ret(env, cursor, cursor->search(cursor));
if (!args->scanning)
(void)cursor->reset(cursor);
(void)cursor->reset(cursor);
ASYNC_NIF_REPLY(reply);
},
{ // post
@ -1649,22 +1662,25 @@ ASYNC_NIF_DECL(
cursor->set_key(cursor, &item_key);
int rc = cursor->search_near(cursor, &exact);
if (rc == 0) {
if (exact == 0) {
/* an exact match */
ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, enif_make_atom(env, "match")));
} else if (exact < 0) {
/* cursor now positioned at the next smaller key */
ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, enif_make_atom(env, "lt")));
} else if (exact > 0) {
/* cursor now positioned at the next larger key */
ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, enif_make_atom(env, "gt")));
}
} else {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
if (rc != 0) {
(void)cursor->reset(cursor);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
if (!args->scanning)
(void)cursor->reset(cursor);
if (exact == 0) {
/* an exact match */
ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, enif_make_atom(env, "match")));
} else if (exact < 0) {
/* cursor now positioned at the next smaller key */
ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, enif_make_atom(env, "lt")));
} else if (exact > 0) {
/* cursor now positioned at the next larger key */
ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, enif_make_atom(env, "gt")));
}
},
{ // post
@ -1926,40 +1942,58 @@ on_unload(ErlNifEnv *env, void *priv_data)
unsigned int i;
struct wterl_priv_data *priv = (struct wterl_priv_data *)priv_data;
khash_t(conns) *h;
khiter_t itr;
khiter_t itr_conns;
WterlConnHandle *conn_handle;
enif_mutex_lock(priv->conns_mutex);
h = priv->conns;
for (itr = kh_begin(h); itr != kh_end(h); ++itr) {
if (kh_exist(h, itr)) {
WterlConnHandle *c = kh_val(h, itr);
if (c) {
enif_mutex_lock(c->contexts_mutex);
enif_free((void*)c->session_config);
for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) {
// TODO: free keys
kh_destroy(cursors, c->contexts[i].cursors);
for (itr_conns = kh_begin(h); itr_conns != kh_end(h); ++itr_conns) {
if (kh_exist(h, itr_conns)) {
conn_handle = kh_val(h, itr_conns);
if (conn_handle) {
enif_mutex_lock(conn_handle->contexts_mutex);
enif_free((void*)conn_handle->session_config);
for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) {
WterlCtx *ctx = &conn_handle->contexts[i];
if (ctx->session != NULL) {
WT_SESSION *session = ctx->session;
khash_t(cursors) *h = ctx->cursors;
khiter_t itr_cursors;
for (itr_cursors = kh_begin(h); itr_cursors != kh_end(h); ++itr_cursors) {
if (kh_exist(h, itr_cursors)) {
WT_CURSOR *cursor = kh_val(h, itr_cursors);
char *key = (char *)kh_key(h, itr_cursors);
cursor->close(cursor);
kh_del(cursors, h, itr_cursors);
enif_free(key);
kh_value(h, itr_cursors) = NULL;
}
}
kh_destroy(cursors, h);
session->close(session, NULL);
}
/* This should close all cursors and sessions. */
c->conn->close(c->conn, NULL);
}
}
/* This would have closed all cursors and sessions for us
but we do that explicitly above. */
conn_handle->conn->close(conn_handle->conn, NULL);
}
}
/* Continue to hold the context mutex while unloading the async_nif
to prevent new work from coming in while shutting down. */
ASYNC_NIF_UNLOAD(wterl, env, priv->async_nif_priv);
for (itr = kh_begin(h); itr != kh_end(h); ++itr) {
if (kh_exist(h, itr)) {
WterlConnHandle *c = kh_val(h, itr);
if (c) {
enif_mutex_unlock(c->contexts_mutex);
enif_mutex_destroy(c->contexts_mutex);
}
for (itr_conns = kh_begin(h); itr_conns != kh_end(h); ++itr_conns) {
if (kh_exist(h, itr_conns)) {
conn_handle = kh_val(h, itr_conns);
if (conn_handle) {
enif_mutex_unlock(conn_handle->contexts_mutex);
enif_mutex_destroy(conn_handle->contexts_mutex);
}
}
}
kh_destroy(conns, h);