Cache session/[{cursor, config}] for reuse and spawn threads when needed. #9
2 changed files with 20 additions and 10 deletions
|
@ -375,7 +375,13 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
|
||||||
q = &async_nif->queues[i];
|
q = &async_nif->queues[i];
|
||||||
enif_mutex_lock(q->reqs_mutex);
|
enif_mutex_lock(q->reqs_mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Set the shutdown flag so that worker threads will no continue
|
||||||
|
executing requests. */
|
||||||
async_nif->shutdown = 1;
|
async_nif->shutdown = 1;
|
||||||
|
|
||||||
|
/* Make sure to wake up all worker threads sitting on conditional
|
||||||
|
wait for work so that they can see it's time to exit. */
|
||||||
for (i = 0; i < num_queues; i++) {
|
for (i = 0; i < num_queues; i++) {
|
||||||
q = &async_nif->queues[i];
|
q = &async_nif->queues[i];
|
||||||
enif_cond_broadcast(q->reqs_cnd);
|
enif_cond_broadcast(q->reqs_cnd);
|
||||||
|
@ -388,7 +394,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
|
||||||
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value);
|
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Free req structres sitting on the recycle queue. */
|
/* Free any req structures sitting unused on the recycle queue. */
|
||||||
enif_mutex_lock(async_nif->recycled_req_mutex);
|
enif_mutex_lock(async_nif->recycled_req_mutex);
|
||||||
req = NULL;
|
req = NULL;
|
||||||
fifo_q_foreach(reqs, async_nif->recycled_reqs, req, {
|
fifo_q_foreach(reqs, async_nif->recycled_reqs, req, {
|
||||||
|
|
|
@ -189,6 +189,7 @@ __ctx_cache_evict(WterlConnHandle *conn_handle)
|
||||||
if (log > mean) {
|
if (log > mean) {
|
||||||
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
|
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
|
||||||
c->session->close(c->session, NULL);
|
c->session->close(c->session, NULL);
|
||||||
|
memset(c, 0, sizeof(struct wterl_ctx));
|
||||||
enif_free(c);
|
enif_free(c);
|
||||||
num_evicted++;
|
num_evicted++;
|
||||||
}
|
}
|
||||||
|
@ -446,6 +447,7 @@ __close_all_sessions(WterlConnHandle *conn_handle)
|
||||||
|
|
||||||
if (c != NULL) {
|
if (c != NULL) {
|
||||||
c->session->close(c->session, NULL);
|
c->session->close(c->session, NULL);
|
||||||
|
memset(c, 0, sizeof(struct wterl_ctx));
|
||||||
enif_free(c);
|
enif_free(c);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -457,6 +459,7 @@ __close_all_sessions(WterlConnHandle *conn_handle)
|
||||||
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
|
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
|
||||||
conn_handle->cache_size -= 1;
|
conn_handle->cache_size -= 1;
|
||||||
c->session->close(c->session, NULL);
|
c->session->close(c->session, NULL);
|
||||||
|
memset(c, 0, sizeof(struct wterl_ctx));
|
||||||
enif_free(c);
|
enif_free(c);
|
||||||
c = n;
|
c = n;
|
||||||
}
|
}
|
||||||
|
@ -2308,6 +2311,7 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
|
||||||
ASYNC_NIF_LOAD(wterl, priv->async_nif_priv);
|
ASYNC_NIF_LOAD(wterl, priv->async_nif_priv);
|
||||||
if (!priv->async_nif_priv) {
|
if (!priv->async_nif_priv) {
|
||||||
enif_mutex_destroy(priv->conns_mutex);
|
enif_mutex_destroy(priv->conns_mutex);
|
||||||
|
memset(priv, 0, sizeof(struct wterl_priv_data));
|
||||||
enif_free(priv);
|
enif_free(priv);
|
||||||
return ENOMEM;
|
return ENOMEM;
|
||||||
}
|
}
|
||||||
|
@ -2334,21 +2338,18 @@ on_unload(ErlNifEnv *env, void *priv_data)
|
||||||
struct wterl_priv_data *priv = (struct wterl_priv_data *)priv_data;
|
struct wterl_priv_data *priv = (struct wterl_priv_data *)priv_data;
|
||||||
WterlConnHandle *conn_handle;
|
WterlConnHandle *conn_handle;
|
||||||
|
|
||||||
enif_mutex_lock(priv->conns_mutex);
|
if (priv_data == NULL)
|
||||||
|
return;
|
||||||
|
|
||||||
/* Lock the cache mutex before unloading the async_nif to prevent new
|
enif_mutex_lock(priv->conns_mutex);
|
||||||
work from coming in while shutting down. */
|
ASYNC_NIF_UNLOAD(wterl, env, priv->async_nif_priv);
|
||||||
SLIST_FOREACH(conn_handle, &priv->conns, conns) {
|
SLIST_FOREACH(conn_handle, &priv->conns, conns) {
|
||||||
enif_mutex_lock(conn_handle->cache_mutex);
|
enif_mutex_lock(conn_handle->cache_mutex);
|
||||||
}
|
|
||||||
|
|
||||||
ASYNC_NIF_UNLOAD(wterl, env, priv->async_nif_priv);
|
|
||||||
|
|
||||||
SLIST_FOREACH(conn_handle, &priv->conns, conns) {
|
|
||||||
__close_all_sessions(conn_handle);
|
__close_all_sessions(conn_handle);
|
||||||
conn_handle->conn->close(conn_handle->conn, NULL);
|
conn_handle->conn->close(conn_handle->conn, NULL);
|
||||||
if (conn_handle->session_config)
|
if (conn_handle->session_config != NULL) {
|
||||||
enif_free((void*)conn_handle->session_config);
|
enif_free((void*)conn_handle->session_config);
|
||||||
|
}
|
||||||
enif_mutex_unlock(conn_handle->cache_mutex);
|
enif_mutex_unlock(conn_handle->cache_mutex);
|
||||||
enif_mutex_destroy(conn_handle->cache_mutex);
|
enif_mutex_destroy(conn_handle->cache_mutex);
|
||||||
}
|
}
|
||||||
|
@ -2369,7 +2370,10 @@ on_unload(ErlNifEnv *env, void *priv_data)
|
||||||
|
|
||||||
enif_mutex_unlock(priv->conns_mutex);
|
enif_mutex_unlock(priv->conns_mutex);
|
||||||
enif_mutex_destroy(priv->conns_mutex);
|
enif_mutex_destroy(priv->conns_mutex);
|
||||||
|
memset(priv, 0, sizeof(struct wterl_priv_data));
|
||||||
enif_free(priv);
|
enif_free(priv);
|
||||||
|
|
||||||
|
priv_data = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
|
|
Loading…
Reference in a new issue