Cache session/[{cursor, config}] for reuse and spawn threads when needed. #9
3 changed files with 14 additions and 5 deletions
|
@ -34,7 +34,7 @@ extern "C" {
|
||||||
#define UNUSED(v) ((void)(v))
|
#define UNUSED(v) ((void)(v))
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define ASYNC_NIF_MAX_WORKERS 1024
|
#define ASYNC_NIF_MAX_WORKERS 128
|
||||||
#define ASYNC_NIF_WORKER_QUEUE_SIZE 500
|
#define ASYNC_NIF_WORKER_QUEUE_SIZE 500
|
||||||
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
|
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#define DEBUG 1
|
||||||
#if !(__STDC_VERSION__ >= 199901L || defined(__GNUC__))
|
#if !(__STDC_VERSION__ >= 199901L || defined(__GNUC__))
|
||||||
# undef DEBUG
|
# undef DEBUG
|
||||||
# define DEBUG 0
|
# define DEBUG 0
|
||||||
|
|
|
@ -223,8 +223,10 @@ __ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig)
|
||||||
conn_handle->histogram[__log2(cpu_clock_ticks() - c->tstamp)]++;
|
conn_handle->histogram[__log2(cpu_clock_ticks() - c->tstamp)]++;
|
||||||
conn_handle->histogram_count++;
|
conn_handle->histogram_count++;
|
||||||
conn_handle->cache_size -= 1;
|
conn_handle->cache_size -= 1;
|
||||||
}
|
break;
|
||||||
c = n;
|
} else {
|
||||||
|
c = n;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
enif_mutex_unlock(conn_handle->cache_mutex);
|
enif_mutex_unlock(conn_handle->cache_mutex);
|
||||||
return c;
|
return c;
|
||||||
|
@ -422,7 +424,7 @@ __release_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx
|
||||||
{
|
{
|
||||||
int i, n;
|
int i, n;
|
||||||
WT_CURSOR *cursor;
|
WT_CURSOR *cursor;
|
||||||
struct wterl_ctx *c;
|
struct wterl_ctx *c = NULL;
|
||||||
|
|
||||||
n = sizeof((WT_CURSOR**)ctx->cursors) / sizeof(ctx->cursors[0]);
|
n = sizeof((WT_CURSOR**)ctx->cursors) / sizeof(ctx->cursors[0]);
|
||||||
for (i = 0; i < n; i++) {
|
for (i = 0; i < n; i++) {
|
||||||
|
@ -702,7 +704,7 @@ ASYNC_NIF_DECL(
|
||||||
conn_handle->conn = conn;
|
conn_handle->conn = conn;
|
||||||
ERL_NIF_TERM result = enif_make_resource(env, conn_handle);
|
ERL_NIF_TERM result = enif_make_resource(env, conn_handle);
|
||||||
|
|
||||||
/* Init hash table which manages the cache of session/cursor(s) */
|
/* Init list for cache of reuseable contexts */
|
||||||
STAILQ_INIT(&conn_handle->cache);
|
STAILQ_INIT(&conn_handle->cache);
|
||||||
conn_handle->cache_size = 0;
|
conn_handle->cache_size = 0;
|
||||||
|
|
||||||
|
@ -749,6 +751,12 @@ ASYNC_NIF_DECL(
|
||||||
},
|
},
|
||||||
{ // work
|
{ // work
|
||||||
|
|
||||||
|
/* First, remove this connection from our list of open connections so
|
||||||
|
we don't free it twice when asked to unload. */
|
||||||
|
enif_mutex_lock(args->priv->conns_mutex);
|
||||||
|
SLIST_REMOVE(&args->priv->conns, args->conn_handle, wterl_conn, conns);
|
||||||
|
enif_mutex_unlock(args->priv->conns_mutex);
|
||||||
|
|
||||||
/* Free up the shared sessions and cursors. */
|
/* Free up the shared sessions and cursors. */
|
||||||
enif_mutex_lock(args->conn_handle->cache_mutex);
|
enif_mutex_lock(args->conn_handle->cache_mutex);
|
||||||
__close_all_sessions(args->conn_handle);
|
__close_all_sessions(args->conn_handle);
|
||||||
|
|
Loading…
Reference in a new issue