A great deal of cleanup. EUnit and EQC tests pass.

This commit is contained in:
Gregory Burd 2013-06-14 16:57:53 -04:00
parent ff7d1d6e20
commit 53307e8c01
4 changed files with 118 additions and 100 deletions

View file

@ -34,7 +34,7 @@ extern "C" {
#define UNUSED(v) ((void)(v)) #define UNUSED(v) ((void)(v))
#endif #endif
#define ASYNC_NIF_MAX_WORKERS 128 #define ASYNC_NIF_MAX_WORKERS 8
#define ASYNC_NIF_WORKER_QUEUE_SIZE 500 #define ASYNC_NIF_WORKER_QUEUE_SIZE 500
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
@ -253,6 +253,9 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
async_nif->next_q = qid; async_nif->next_q = qid;
} }
q = &async_nif->queues[qid];
enif_mutex_lock(q->reqs_mutex);
#if 0 // stats aren't yet thread safe, so this can go haywire... TODO: fix. #if 0 // stats aren't yet thread safe, so this can go haywire... TODO: fix.
unsigned int n = async_nif->num_queues; unsigned int n = async_nif->num_queues;
@ -277,12 +280,12 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
qid = (qid + 1) % async_nif->num_queues; qid = (qid + 1) % async_nif->num_queues;
q = &async_nif->queues[qid]; q = &async_nif->queues[qid];
} else { } else {
// q->reqs_mutex unlocked at end of function
break; break;
} }
} }
// TODO: at some point add in work sheading/stealing // TODO: at some point add in work sheading/stealing
} while(n-- > 0); } while(n-- > 0);
#endif #endif
/* We hold the queue's lock, and we've seletect a reasonable queue for this /* We hold the queue's lock, and we've seletect a reasonable queue for this
@ -400,15 +403,6 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value); enif_thread_join(async_nif->worker_entries[i].tid, &exit_value);
} }
/* Free any req structures sitting unused on the recycle queue. */
enif_mutex_lock(async_nif->recycled_req_mutex);
req = NULL;
fifo_q_foreach(reqs, async_nif->recycled_reqs, req, {
enif_free_env(req->env);
enif_free(req);
});
fifo_q_free(reqs, async_nif->recycled_reqs);
/* Cleanup in-flight requests, mutexes and conditions in each work queue. */ /* Cleanup in-flight requests, mutexes and conditions in each work queue. */
for (i = 0; i < num_queues; i++) { for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i]; q = &async_nif->queues[i];
@ -430,6 +424,15 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
enif_cond_destroy(q->reqs_cnd); enif_cond_destroy(q->reqs_cnd);
} }
/* Free any req structures sitting unused on the recycle queue. */
enif_mutex_lock(async_nif->recycled_req_mutex);
req = NULL;
fifo_q_foreach(reqs, async_nif->recycled_reqs, req, {
enif_free_env(req->env);
enif_free(req);
});
fifo_q_free(reqs, async_nif->recycled_reqs);
enif_mutex_unlock(async_nif->recycled_req_mutex); enif_mutex_unlock(async_nif->recycled_req_mutex);
enif_mutex_destroy(async_nif->recycled_req_mutex); enif_mutex_destroy(async_nif->recycled_req_mutex);
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues)); memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));

View file

@ -24,7 +24,6 @@
extern "C" { extern "C" {
#endif #endif
#define DEBUG 1
#if !(__STDC_VERSION__ >= 199901L || defined(__GNUC__)) #if !(__STDC_VERSION__ >= 199901L || defined(__GNUC__))
# undef DEBUG # undef DEBUG
# define DEBUG 0 # define DEBUG 0

View file

@ -47,6 +47,7 @@ struct wterl_ctx {
size_t sig_len; size_t sig_len;
WT_SESSION *session; WT_SESSION *session;
const char *session_config; const char *session_config;
uint32_t num_cursors;
struct cursor_info { struct cursor_info {
const char *uri; const char *uri;
const char *config; const char *config;
@ -61,7 +62,6 @@ typedef struct wterl_conn {
ErlNifMutex *cache_mutex; ErlNifMutex *cache_mutex;
uint32_t cache_size; uint32_t cache_size;
struct wterl_ctx *mru_ctx[ASYNC_NIF_MAX_WORKERS]; struct wterl_ctx *mru_ctx[ASYNC_NIF_MAX_WORKERS];
SLIST_ENTRY(wterl_conn) conns;
uint64_t histogram[64]; uint64_t histogram[64];
uint64_t histogram_count; uint64_t histogram_count;
} WterlConnHandle; } WterlConnHandle;
@ -84,8 +84,6 @@ struct wterl_event_handlers {
struct wterl_priv_data { struct wterl_priv_data {
void *async_nif_priv; // Note: must be first element in struct void *async_nif_priv; // Note: must be first element in struct
ErlNifMutex *conns_mutex;
SLIST_HEAD(conns, wterl_conn) conns;
struct wterl_event_handlers eh; struct wterl_event_handlers eh;
char wterl_vsn[512]; char wterl_vsn[512];
char wiredtiger_vsn[512]; char wiredtiger_vsn[512];
@ -106,6 +104,15 @@ static ERL_NIF_TERM ATOM_MSG_PID;
/* Global init for async_nif. */ /* Global init for async_nif. */
ASYNC_NIF_INIT(wterl); ASYNC_NIF_INIT(wterl);
static inline size_t
__strlen(const char *s)
{
if (s)
return strlen(s);
else
return 0;
}
/** /**
* A string hash function. * A string hash function.
* *
@ -217,7 +224,6 @@ __ctx_cache_evict(WterlConnHandle *conn_handle)
if (log > mean) { if (log > mean) {
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries); STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
c->session->close(c->session, NULL); c->session->close(c->session, NULL);
memset(c, 0, sizeof(*c));
enif_free(c); enif_free(c);
num_evicted++; num_evicted++;
} }
@ -251,10 +257,10 @@ __ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig)
conn_handle->histogram[__log2(cpu_clock_ticks() - c->tstamp)]++; conn_handle->histogram[__log2(cpu_clock_ticks() - c->tstamp)]++;
conn_handle->histogram_count++; conn_handle->histogram_count++;
conn_handle->cache_size -= 1; conn_handle->cache_size -= 1;
break; break;
} else { } else {
c = n; c = n;
} }
} }
enif_mutex_unlock(conn_handle->cache_mutex); enif_mutex_unlock(conn_handle->cache_mutex);
return c; return c;
@ -300,28 +306,46 @@ __ctx_cache_sig(const char *c, va_list ap, int count, size_t *len)
const char *arg; const char *arg;
size_t l = 0; size_t l = 0;
*len = 0;
if (c) { if (c) {
l = strlen(c); l = __strlen(c);
hash = __str_hash(hash, c, l); hash = __str_hash(hash, c, l);
crc = __crc32(crc, c, l); crc = __crc32(crc, c, l);
*len += l + 1; *len += l + 1;
} else {
*len += 1;
} }
for (i = 0; i < (2 * count); i++) { for (i = 0; i < (2 * count); i++) {
arg = va_arg(ap, const char *); arg = va_arg(ap, const char *);
if (arg) { if (arg) {
l = strlen(c); l = __strlen(arg);
hash = __str_hash(hash, arg, l); hash = __str_hash(hash, arg, l);
crc = __crc32(crc, arg, strlen(arg)); crc = __crc32(crc, arg, __strlen(arg));
*len += l + 1; *len += l + 1;
} else {
*len += 1;
} }
} }
sig = crc; sig = crc;
sig = sig << 32; sig = sig << 32;
sig &= hash; sig &= hash;
return sig; return sig;
} }
static inline char *
__copy_str_into(char **p, const char *s)
{
char *a = *p;
size_t len = __strlen(s);
memcpy(*p, s, len);
(*p)[len] = '\0';
*p += len + 1;
return a;
}
/** /**
* Get a reusable cursor that was opened for a particular worker within its * Get a reusable cursor that was opened for a particular worker within its
* session. * session.
@ -391,18 +415,16 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
(*ctx)->session = session; (*ctx)->session = session;
(*ctx)->sig_len = sig_len; (*ctx)->sig_len = sig_len;
char *p = (char *)(*ctx) + (s - sig_len); char *p = (char *)(*ctx) + (s - sig_len);
(*ctx)->session_config = p; (*ctx)->session_config = __copy_str_into(&p, session_config);
memcpy(p, session_config, strlen(session_config)); p++; (*ctx)->num_cursors = count;
session_config = arg; session_config = arg;
va_start(ap, session_config); va_start(ap, session_config);
for (i = 0; i < count; i++) { for (i = 0; i < count; i++) {
const char *uri = va_arg(ap, const char *); const char *uri = va_arg(ap, const char *);
const char *config = va_arg(ap, const char *); const char *config = va_arg(ap, const char *);
// TODO: what to do (if anything) when uri or config is NULL? // TODO: what to do (if anything) when uri or config is NULL?
(*ctx)->ci[i].uri = p; (*ctx)->ci[i].uri = __copy_str_into(&p, uri);
memcpy(p, uri, strlen(uri)); p++; (*ctx)->ci[i].config = __copy_str_into(&p, config);
(*ctx)->ci[i].config = p;
memcpy(p, config, strlen(config)); p++;
rc = session->open_cursor(session, uri, NULL, config, &(*ctx)->ci[i].cursor); rc = session->open_cursor(session, uri, NULL, config, &(*ctx)->ci[i].cursor);
if (rc != 0) { if (rc != 0) {
enif_free(*ctx); enif_free(*ctx);
@ -425,12 +447,11 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
static void static void
__release_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx *ctx) __release_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx *ctx)
{ {
int i, n; uint32_t i;
WT_CURSOR *cursor; WT_CURSOR *cursor;
struct wterl_ctx *c = NULL; struct wterl_ctx *c = NULL;
n = sizeof((WT_CURSOR**)ctx->ci) / sizeof(ctx->ci[0]); for (i = 0; i < ctx->num_cursors; i++) {
for (i = 0; i < n; i++) {
cursor = ctx->ci[i].cursor; cursor = ctx->ci[i].cursor;
cursor->reset(cursor); cursor->reset(cursor);
} }
@ -438,9 +459,13 @@ __release_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx
c = conn_handle->mru_ctx[worker_id]; c = conn_handle->mru_ctx[worker_id];
if (CASPO(&conn_handle->mru_ctx[worker_id], c, ctx) != c) { if (CASPO(&conn_handle->mru_ctx[worker_id], c, ctx) != c) {
__ctx_cache_add(conn_handle, ctx); __ctx_cache_add(conn_handle, ctx);
DPRINTF("[%.4u] reset %d cursors, returnd ctx to cache", worker_id, ctx->num_cursors);
} else { } else {
if (c != NULL) { if (c != NULL) {
__ctx_cache_add(conn_handle, c); __ctx_cache_add(conn_handle, c);
DPRINTF("[%.4u] reset %d cursors, returnd ctx to cache", worker_id, ctx->num_cursors);
} else {
DPRINTF("[%.4u] reset %d cursors, returnd ctx to mru", worker_id, ctx->num_cursors);
} }
} }
} }
@ -464,7 +489,6 @@ __close_all_sessions(WterlConnHandle *conn_handle)
if (c != NULL) { if (c != NULL) {
c->session->close(c->session, NULL); c->session->close(c->session, NULL);
memset(c, 0, sizeof(*c));
enif_free(c); enif_free(c);
} }
} }
@ -476,7 +500,6 @@ __close_all_sessions(WterlConnHandle *conn_handle)
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries); STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
conn_handle->cache_size -= 1; conn_handle->cache_size -= 1;
c->session->close(c->session, NULL); c->session->close(c->session, NULL);
memset(c, 0, sizeof(struct wterl_ctx));
enif_free(c); enif_free(c);
c = n; c = n;
} }
@ -491,30 +514,37 @@ void
__close_cursors_on(WterlConnHandle *conn_handle, const char *uri) __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
{ {
struct wterl_ctx *c, *n; struct wterl_ctx *c, *n;
int worker_id, cnt; int worker_id, idx, cnt;
// TODO: improve this... but for now it's easiest to just toss everything // walk the mru first, look for open cursors on matching uri
// from the mru into the cache as a first step.
for (worker_id = 0; worker_id < ASYNC_NIF_MAX_WORKERS; worker_id++) { for (worker_id = 0; worker_id < ASYNC_NIF_MAX_WORKERS; worker_id++) {
do { c = conn_handle->mru_ctx[worker_id];
c = conn_handle->mru_ctx[worker_id]; if (CASPO(&conn_handle->mru_ctx[worker_id], c, NULL) == c && c != NULL) {
} while(CASPO(&conn_handle->mru_ctx[worker_id], c, NULL) != c); cnt = c->num_cursors;
for(idx = 0; idx < cnt; idx++) {
if (c != NULL) if (!strcmp(c->ci[idx].uri, uri)) {
__ctx_cache_add(conn_handle, c); c->session->close(c->session, NULL);
enif_free(c);
break;
} else {
if (CASPO(&conn_handle->mru_ctx[worker_id], NULL, c) != NULL) {
__ctx_cache_add(conn_handle, c);
}
}
}
}
} }
// walk the cache, look for open cursors on matching uri // next we walk the cache, look for open cursors on matching uri
c = STAILQ_FIRST(&conn_handle->cache); c = STAILQ_FIRST(&conn_handle->cache);
while (c != NULL) { while (c != NULL) {
n = STAILQ_NEXT(c, entries); n = STAILQ_NEXT(c, entries);
cnt = sizeof((WT_CURSOR**)c->ci) / sizeof(c->ci[0]); cnt = c->num_cursors;
for(;cnt > 0; cnt--) { for(idx = 0; idx < cnt; idx++) {
if (!strcmp(c->ci[cnt].uri, uri)) { if (!strcmp(c->ci[idx].uri, uri)) {
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries); STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
conn_handle->cache_size -= 1; conn_handle->cache_size -= 1;
c->session->close(c->session, NULL); c->session->close(c->session, NULL);
memset(c, 0, sizeof(*c));
enif_free(c); enif_free(c);
break; break;
} }
@ -683,7 +713,7 @@ ASYNC_NIF_DECL(
{ // pre { // pre
if (!(argc == 3 && if (!(argc == 3 &&
(enif_get_string(env, argv[0], args->homedir, sizeof args->homedir, ERL_NIF_LATIN1) > 0) && (enif_get_string(env, argv[0], args->homedir, sizeof(args->homedir), ERL_NIF_LATIN1) > 0) &&
enif_is_binary(env, argv[1]) && enif_is_binary(env, argv[1]) &&
enif_is_binary(env, argv[2]))) { enif_is_binary(env, argv[2]))) {
ASYNC_NIF_RETURN_BADARG(); ASYNC_NIF_RETURN_BADARG();
@ -740,12 +770,6 @@ ASYNC_NIF_DECL(
STAILQ_INIT(&conn_handle->cache); STAILQ_INIT(&conn_handle->cache);
conn_handle->cache_size = 0; conn_handle->cache_size = 0;
/* Keep track of open connections so as to free when unload/reload/etc.
are called. */
enif_mutex_lock(args->priv->conns_mutex);
SLIST_INSERT_HEAD(&args->priv->conns, conn_handle, conns);
enif_mutex_unlock(args->priv->conns_mutex);
enif_release_resource(conn_handle); enif_release_resource(conn_handle);
enif_mutex_unlock(conn_handle->cache_mutex); enif_mutex_unlock(conn_handle->cache_mutex);
ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result)); ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result));
@ -783,12 +807,6 @@ ASYNC_NIF_DECL(
}, },
{ // work { // work
/* First, remove this connection from our list of open connections so
we don't free it twice when asked to unload. */
enif_mutex_lock(args->priv->conns_mutex);
SLIST_REMOVE(&args->priv->conns, args->conn_handle, wterl_conn, conns);
enif_mutex_unlock(args->priv->conns_mutex);
/* Free up the shared sessions and cursors. */ /* Free up the shared sessions and cursors. */
enif_mutex_lock(args->conn_handle->cache_mutex); enif_mutex_lock(args->conn_handle->cache_mutex);
__close_all_sessions(args->conn_handle); __close_all_sessions(args->conn_handle);
@ -798,11 +816,6 @@ ASYNC_NIF_DECL(
} }
WT_CONNECTION* conn = args->conn_handle->conn; WT_CONNECTION* conn = args->conn_handle->conn;
int rc = conn->close(conn, NULL); int rc = conn->close(conn, NULL);
/* Connection is closed, remove it so we don't free on unload/reload/etc. */
enif_mutex_lock(args->priv->conns_mutex);
SLIST_REMOVE(&args->priv->conns, args->conn_handle, wterl_conn, conns);
enif_mutex_unlock(args->priv->conns_mutex);
enif_mutex_unlock(args->conn_handle->cache_mutex); enif_mutex_unlock(args->conn_handle->cache_mutex);
enif_mutex_destroy(args->conn_handle->cache_mutex); enif_mutex_destroy(args->conn_handle->cache_mutex);
memset(args->conn_handle, 0, sizeof(WterlConnHandle)); memset(args->conn_handle, 0, sizeof(WterlConnHandle));
@ -836,7 +849,7 @@ ASYNC_NIF_DECL(
if (!(argc == 3 && if (!(argc == 3 &&
enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) &&
(enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) > 0) && (enif_get_string(env, argv[1], args->uri, sizeof(args->uri), ERL_NIF_LATIN1) > 0) &&
enif_is_binary(env, argv[2]))) { enif_is_binary(env, argv[2]))) {
ASYNC_NIF_RETURN_BADARG(); ASYNC_NIF_RETURN_BADARG();
} }
@ -890,7 +903,7 @@ ASYNC_NIF_DECL(
if (!(argc == 3 && if (!(argc == 3 &&
enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) &&
(enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) > 0) && (enif_get_string(env, argv[1], args->uri, sizeof(args->uri), ERL_NIF_LATIN1) > 0) &&
enif_is_binary(env, argv[2]))) { enif_is_binary(env, argv[2]))) {
ASYNC_NIF_RETURN_BADARG(); ASYNC_NIF_RETURN_BADARG();
} }
@ -956,8 +969,8 @@ ASYNC_NIF_DECL(
if (!(argc == 4 && if (!(argc == 4 &&
enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) &&
(enif_get_string(env, argv[1], args->oldname, sizeof args->oldname, ERL_NIF_LATIN1) > 0) && (enif_get_string(env, argv[1], args->oldname, sizeof(args->oldname), ERL_NIF_LATIN1) > 0) &&
(enif_get_string(env, argv[2], args->newname, sizeof args->newname, ERL_NIF_LATIN1) > 0) && (enif_get_string(env, argv[2], args->newname, sizeof(args->newname), ERL_NIF_LATIN1) > 0) &&
enif_is_binary(env, argv[3]))) { enif_is_binary(env, argv[3]))) {
ASYNC_NIF_RETURN_BADARG(); ASYNC_NIF_RETURN_BADARG();
} }
@ -1025,7 +1038,7 @@ ASYNC_NIF_DECL(
if (!(argc == 3 && if (!(argc == 3 &&
enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) &&
(enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) > 0) && (enif_get_string(env, argv[1], args->uri, sizeof(args->uri), ERL_NIF_LATIN1) > 0) &&
enif_is_binary(env, argv[2]))) { enif_is_binary(env, argv[2]))) {
ASYNC_NIF_RETURN_BADARG(); ASYNC_NIF_RETURN_BADARG();
} }
@ -1139,7 +1152,7 @@ ASYNC_NIF_DECL(
if (!(argc == 5 && if (!(argc == 5 &&
enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) &&
(enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) > 0) && (enif_get_string(env, argv[1], args->uri, sizeof(args->uri), ERL_NIF_LATIN1) > 0) &&
enif_is_binary(env, argv[4]))) { enif_is_binary(env, argv[4]))) {
ASYNC_NIF_RETURN_BADARG(); ASYNC_NIF_RETURN_BADARG();
} }
@ -1298,7 +1311,7 @@ ASYNC_NIF_DECL(
if (!(argc == 3 && if (!(argc == 3 &&
enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) &&
(enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) > 0) && (enif_get_string(env, argv[1], args->uri, sizeof(args->uri), ERL_NIF_LATIN1) > 0) &&
enif_is_binary(env, argv[2]))) { enif_is_binary(env, argv[2]))) {
ASYNC_NIF_RETURN_BADARG(); ASYNC_NIF_RETURN_BADARG();
} }
@ -1360,7 +1373,7 @@ ASYNC_NIF_DECL(
if (!(argc == 3 && if (!(argc == 3 &&
enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) &&
(enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) > 0) && (enif_get_string(env, argv[1], args->uri, sizeof(args->uri), ERL_NIF_LATIN1) > 0) &&
enif_is_binary(env, argv[2]))) { enif_is_binary(env, argv[2]))) {
ASYNC_NIF_RETURN_BADARG(); ASYNC_NIF_RETURN_BADARG();
} }
@ -1421,13 +1434,13 @@ ASYNC_NIF_DECL(
if (!(argc == 3 && if (!(argc == 3 &&
enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) &&
(enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) > 0) && (enif_get_string(env, argv[1], args->uri, sizeof(args->uri), ERL_NIF_LATIN1) > 0) &&
enif_is_binary(env, argv[2]))) { enif_is_binary(env, argv[2]))) {
ASYNC_NIF_RETURN_BADARG(); ASYNC_NIF_RETURN_BADARG();
} }
args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]);
enif_keep_resource((void*)args->conn_handle); enif_keep_resource((void*)args->conn_handle);
affinity = __str_hash(0, args->uri, strlen(args->uri)); affinity = __str_hash(0, args->uri, __strlen(args->uri));
}, },
{ // work { // work
@ -1480,13 +1493,13 @@ ASYNC_NIF_DECL(
if (!(argc == 3 && if (!(argc == 3 &&
enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) &&
(enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) > 0) && (enif_get_string(env, argv[1], args->uri, sizeof(args->uri), ERL_NIF_LATIN1) > 0) &&
enif_is_binary(env, argv[2]))) { enif_is_binary(env, argv[2]))) {
ASYNC_NIF_RETURN_BADARG(); ASYNC_NIF_RETURN_BADARG();
} }
args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]);
enif_keep_resource((void*)args->conn_handle); enif_keep_resource((void*)args->conn_handle);
affinity = __str_hash(0, args->uri, strlen(args->uri)); affinity = __str_hash(0, args->uri, __strlen(args->uri));
}, },
{ // work { // work
@ -1558,7 +1571,7 @@ ASYNC_NIF_DECL(
if (!(argc == 4 && if (!(argc == 4 &&
enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) &&
(enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) > 0) && (enif_get_string(env, argv[1], args->uri, sizeof(args->uri), ERL_NIF_LATIN1) > 0) &&
enif_is_binary(env, argv[2]) && enif_is_binary(env, argv[2]) &&
enif_is_binary(env, argv[3]))) { enif_is_binary(env, argv[3]))) {
ASYNC_NIF_RETURN_BADARG(); ASYNC_NIF_RETURN_BADARG();
@ -1566,7 +1579,7 @@ ASYNC_NIF_DECL(
args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]);
args->value = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[3]); args->value = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[3]);
enif_keep_resource((void*)args->conn_handle); enif_keep_resource((void*)args->conn_handle);
affinity = __str_hash(0, args->uri, strlen(args->uri)); affinity = __str_hash(0, args->uri, __strlen(args->uri));
}, },
{ // work { // work
@ -1628,7 +1641,7 @@ ASYNC_NIF_DECL(
if (!(argc == 3 && if (!(argc == 3 &&
enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) &&
(enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) > 0) && (enif_get_string(env, argv[1], args->uri, sizeof(args->uri), ERL_NIF_LATIN1) > 0) &&
enif_is_binary(env, argv[2]))) { enif_is_binary(env, argv[2]))) {
ASYNC_NIF_RETURN_BADARG(); ASYNC_NIF_RETURN_BADARG();
} }
@ -2298,6 +2311,26 @@ wterl_set_event_handler_pid(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
} }
/**
* Called when a connection is free'd, our opportunity to clean up
* allocated resources.
*/
static void __wterl_conn_dtor(ErlNifEnv* env, void* obj)
{
UNUSED(env);
WterlConnHandle *conn_handle = (WterlConnHandle *)obj;
if (conn_handle->cache_mutex) {
DPRINTF("Non-NULL conn_handle (%p) to free", obj);
enif_mutex_lock(conn_handle->cache_mutex);
__close_all_sessions(conn_handle);
conn_handle->conn->close(conn_handle->conn, NULL);
enif_mutex_unlock(conn_handle->cache_mutex);
enif_mutex_destroy(conn_handle->cache_mutex);
}
}
/** /**
* Called as this driver is loaded by the Erlang BEAM runtime triggered by the * Called as this driver is loaded by the Erlang BEAM runtime triggered by the
* module's on_load directive. * module's on_load directive.
@ -2317,7 +2350,7 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
const ERL_NIF_TERM* option; const ERL_NIF_TERM* option;
ErlNifResourceFlags flags = ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER; ErlNifResourceFlags flags = ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER;
wterl_conn_RESOURCE = enif_open_resource_type(env, NULL, "wterl_conn_resource", wterl_conn_RESOURCE = enif_open_resource_type(env, NULL, "wterl_conn_resource",
NULL, flags, NULL); __wterl_conn_dtor, flags, NULL);
wterl_cursor_RESOURCE = enif_open_resource_type(env, NULL, "wterl_cursor_resource", wterl_cursor_RESOURCE = enif_open_resource_type(env, NULL, "wterl_cursor_resource",
NULL, flags, NULL); NULL, flags, NULL);
@ -2337,9 +2370,6 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
return ENOMEM; return ENOMEM;
memset(priv, 0, sizeof(struct wterl_priv_data)); memset(priv, 0, sizeof(struct wterl_priv_data));
priv->conns_mutex = enif_mutex_create(NULL);
SLIST_INIT(&priv->conns);
struct wterl_event_handlers *eh = &priv->eh; struct wterl_event_handlers *eh = &priv->eh;
eh->error_mutex = enif_mutex_create(NULL); eh->error_mutex = enif_mutex_create(NULL);
eh->message_mutex = enif_mutex_create(NULL); eh->message_mutex = enif_mutex_create(NULL);
@ -2365,7 +2395,6 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
pointer to the async_nif's private data which we set here. */ pointer to the async_nif's private data which we set here. */
ASYNC_NIF_LOAD(wterl, priv->async_nif_priv); ASYNC_NIF_LOAD(wterl, priv->async_nif_priv);
if (!priv->async_nif_priv) { if (!priv->async_nif_priv) {
enif_mutex_destroy(priv->conns_mutex);
memset(priv, 0, sizeof(struct wterl_priv_data)); memset(priv, 0, sizeof(struct wterl_priv_data));
enif_free(priv); enif_free(priv);
return ENOMEM; return ENOMEM;
@ -2391,23 +2420,12 @@ static void
on_unload(ErlNifEnv *env, void *priv_data) on_unload(ErlNifEnv *env, void *priv_data)
{ {
struct wterl_priv_data *priv = (struct wterl_priv_data *)priv_data; struct wterl_priv_data *priv = (struct wterl_priv_data *)priv_data;
WterlConnHandle *conn_handle;
if (priv_data == NULL) if (priv_data == NULL)
return; return;
enif_mutex_lock(priv->conns_mutex); DPRINTF("unloading wterl NIF (%p)", priv);
ASYNC_NIF_UNLOAD(wterl, env, priv->async_nif_priv); ASYNC_NIF_UNLOAD(wterl, env, priv->async_nif_priv);
SLIST_FOREACH(conn_handle, &priv->conns, conns) {
enif_mutex_lock(conn_handle->cache_mutex);
__close_all_sessions(conn_handle);
conn_handle->conn->close(conn_handle->conn, NULL);
if (conn_handle->session_config != NULL) {
enif_free((void*)conn_handle->session_config);
}
enif_mutex_unlock(conn_handle->cache_mutex);
enif_mutex_destroy(conn_handle->cache_mutex);
}
/* At this point all WiredTiger state and threads are free'd/stopped so there /* At this point all WiredTiger state and threads are free'd/stopped so there
is no chance that the event handler functions will be called so we can is no chance that the event handler functions will be called so we can
@ -2423,8 +2441,6 @@ on_unload(ErlNifEnv *env, void *priv_data)
if (eh->msg_env_progress) if (eh->msg_env_progress)
enif_free_env(eh->msg_env_progress); enif_free_env(eh->msg_env_progress);
enif_mutex_unlock(priv->conns_mutex);
enif_mutex_destroy(priv->conns_mutex);
memset(priv, 0, sizeof(struct wterl_priv_data)); memset(priv, 0, sizeof(struct wterl_priv_data));
enif_free(priv); enif_free(priv);

View file

@ -618,7 +618,7 @@ various_online_test_() ->
end}, end},
{"truncate entire table", {"truncate entire table",
fun() -> fun() ->
?assertMatch(ok, truncate(ConnRef, "table:test")), ?assertMatch(ok, truncate(ConnRef, "table:test")),
?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)) ?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>))
end}, end},
%% {"truncate range [<<b>>..last], ensure value outside range is found after", %% {"truncate range [<<b>>..last], ensure value outside range is found after",