Remove unused num_contexts count. Cleanup the session/cursor cache
init process.
This commit is contained in:
parent
728d2281e0
commit
95d8a28453
2 changed files with 19 additions and 23 deletions
|
@ -71,7 +71,6 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
WT_CONNECTION *conn;
|
WT_CONNECTION *conn;
|
||||||
const char *session_config;
|
const char *session_config;
|
||||||
unsigned int num_contexts;
|
|
||||||
ErlNifMutex *contexts_mutex;
|
ErlNifMutex *contexts_mutex;
|
||||||
WterlCtx contexts[ASYNC_NIF_MAX_WORKERS];
|
WterlCtx contexts[ASYNC_NIF_MAX_WORKERS];
|
||||||
} WterlConnHandle;
|
} WterlConnHandle;
|
||||||
|
@ -108,24 +107,17 @@ ASYNC_NIF_INIT(wterl);
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
static int
|
static int
|
||||||
__create_new_session_cache(WterlConnHandle *conn_handle, WterlCtx *ctx)
|
__init_session_and_cursor_cache(WterlConnHandle *conn_handle, WterlCtx *ctx)
|
||||||
{
|
{
|
||||||
/* Create a context for this worker thread to reuse. */
|
/* Create a context for this worker thread to reuse. */
|
||||||
enif_mutex_lock(conn_handle->contexts_mutex);
|
|
||||||
if (ctx->session != NULL) {
|
|
||||||
enif_mutex_unlock(conn_handle->contexts_mutex);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
WT_CONNECTION *conn = conn_handle->conn;
|
WT_CONNECTION *conn = conn_handle->conn;
|
||||||
int rc = conn->open_session(conn, NULL, conn_handle->session_config, &ctx->session);
|
int rc = conn->open_session(conn, NULL, conn_handle->session_config, &ctx->session);
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
|
ctx->session = NULL;
|
||||||
enif_mutex_unlock(conn_handle->contexts_mutex);
|
enif_mutex_unlock(conn_handle->contexts_mutex);
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
ctx->cursors = kh_init(cursors);
|
ctx->cursors = kh_init(cursors);
|
||||||
conn_handle->num_contexts++;
|
|
||||||
enif_mutex_unlock(conn_handle->contexts_mutex);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -139,9 +131,11 @@ __session_for(WterlConnHandle *conn_handle, unsigned int worker_id, WT_SESSION *
|
||||||
WterlCtx *ctx = &conn_handle->contexts[worker_id];
|
WterlCtx *ctx = &conn_handle->contexts[worker_id];
|
||||||
int rc = 0;
|
int rc = 0;
|
||||||
|
|
||||||
if (ctx->session == NULL)
|
if (ctx->session == NULL) {
|
||||||
rc = __create_new_session_cache(conn_handle, ctx);
|
enif_mutex_lock(conn_handle->contexts_mutex);
|
||||||
|
rc = __init_session_and_cursor_cache(conn_handle, ctx);
|
||||||
|
enif_mutex_unlock(conn_handle->contexts_mutex);
|
||||||
|
}
|
||||||
*session = ctx->session;
|
*session = ctx->session;
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
@ -155,9 +149,10 @@ void
|
||||||
__close_all_sessions(WterlConnHandle *conn_handle)
|
__close_all_sessions(WterlConnHandle *conn_handle)
|
||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) {
|
for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) {
|
||||||
WterlCtx *ctx = &conn_handle->contexts[i];
|
WterlCtx *ctx = &conn_handle->contexts[i];
|
||||||
if (ctx->session) {
|
if (ctx->session != NULL) {
|
||||||
WT_SESSION *session = ctx->session;
|
WT_SESSION *session = ctx->session;
|
||||||
khash_t(cursors) *h = ctx->cursors;
|
khash_t(cursors) *h = ctx->cursors;
|
||||||
khiter_t itr;
|
khiter_t itr;
|
||||||
|
@ -176,7 +171,6 @@ __close_all_sessions(WterlConnHandle *conn_handle)
|
||||||
ctx->session = NULL;
|
ctx->session = NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
conn_handle->num_contexts = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -188,9 +182,10 @@ void
|
||||||
__close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
|
__close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
|
||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) {
|
for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) {
|
||||||
WterlCtx *ctx = &conn_handle->contexts[i];
|
WterlCtx *ctx = &conn_handle->contexts[i];
|
||||||
if (ctx->session) {
|
if (ctx->session != NULL) {
|
||||||
khash_t(cursors) *h = ctx->cursors;
|
khash_t(cursors) *h = ctx->cursors;
|
||||||
khiter_t itr = kh_get(cursors, h, (char *)uri);
|
khiter_t itr = kh_get(cursors, h, (char *)uri);
|
||||||
if (itr != kh_end(h)) {
|
if (itr != kh_end(h)) {
|
||||||
|
@ -218,8 +213,11 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char
|
||||||
khiter_t itr;
|
khiter_t itr;
|
||||||
int rc;
|
int rc;
|
||||||
|
|
||||||
if (ctx->session == NULL)
|
if (ctx->session == NULL) {
|
||||||
__create_new_session_cache(conn_handle, ctx); // TODO: check return value
|
enif_mutex_lock(conn_handle->contexts_mutex);
|
||||||
|
__init_session_and_cursor_cache(conn_handle, ctx); // TODO: check return value
|
||||||
|
enif_mutex_unlock(conn_handle->contexts_mutex);
|
||||||
|
}
|
||||||
|
|
||||||
h = ctx->cursors;
|
h = ctx->cursors;
|
||||||
itr = kh_get(cursors, h, (char *)uri);
|
itr = kh_get(cursors, h, (char *)uri);
|
||||||
|
@ -245,7 +243,6 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char
|
||||||
memcpy(key, uri, 128);
|
memcpy(key, uri, 128);
|
||||||
int itr_status;
|
int itr_status;
|
||||||
itr = kh_put(cursors, h, key, &itr_status);
|
itr = kh_put(cursors, h, key, &itr_status);
|
||||||
//assert(itr != kh_end());
|
|
||||||
kh_value(h, itr) = *cursor;
|
kh_value(h, itr) = *cursor;
|
||||||
enif_mutex_unlock(conn_handle->contexts_mutex);
|
enif_mutex_unlock(conn_handle->contexts_mutex);
|
||||||
}
|
}
|
||||||
|
@ -349,7 +346,6 @@ ASYNC_NIF_DECL(
|
||||||
conn_handle->contexts_mutex = enif_mutex_create(NULL);
|
conn_handle->contexts_mutex = enif_mutex_create(NULL);
|
||||||
enif_mutex_lock(conn_handle->contexts_mutex);
|
enif_mutex_lock(conn_handle->contexts_mutex);
|
||||||
conn_handle->conn = conn;
|
conn_handle->conn = conn;
|
||||||
conn_handle->num_contexts = 0;
|
|
||||||
memset(conn_handle->contexts, 0, sizeof(WterlCtx) * ASYNC_NIF_MAX_WORKERS);
|
memset(conn_handle->contexts, 0, sizeof(WterlCtx) * ASYNC_NIF_MAX_WORKERS);
|
||||||
ERL_NIF_TERM result = enif_make_resource(env, conn_handle);
|
ERL_NIF_TERM result = enif_make_resource(env, conn_handle);
|
||||||
|
|
||||||
|
@ -420,9 +416,9 @@ ASYNC_NIF_DECL(
|
||||||
kh_value(h, itr) = NULL;
|
kh_value(h, itr) = NULL;
|
||||||
}
|
}
|
||||||
enif_mutex_unlock(args->priv->conns_mutex);
|
enif_mutex_unlock(args->priv->conns_mutex);
|
||||||
|
// TODO: dtor? enif_mutex_destroy(args->conn_handle->contexts_mutex);
|
||||||
enif_mutex_unlock(args->conn_handle->contexts_mutex);
|
enif_mutex_unlock(args->conn_handle->contexts_mutex);
|
||||||
enif_mutex_destroy(args->conn_handle->contexts_mutex);
|
|
||||||
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
|
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
|
||||||
},
|
},
|
||||||
{ // post
|
{ // post
|
||||||
|
|
|
@ -516,7 +516,7 @@ config_to_bin([{Key, Value} | Rest], Acc) ->
|
||||||
-define(TEST_DATA_DIR, "test/wterl.basic").
|
-define(TEST_DATA_DIR, "test/wterl.basic").
|
||||||
|
|
||||||
open_test_conn(DataDir) ->
|
open_test_conn(DataDir) ->
|
||||||
open_test_conn(DataDir, [{create,true},{cache_size,"100MB"}]).
|
open_test_conn(DataDir, [{create,true},{cache_size,"100MB"},{session_max, 8192}]).
|
||||||
open_test_conn(DataDir, OpenConfig) ->
|
open_test_conn(DataDir, OpenConfig) ->
|
||||||
{ok, CWD} = file:get_cwd(),
|
{ok, CWD} = file:get_cwd(),
|
||||||
rmdir:path(filename:join([CWD, DataDir])), %?cmd("rm -rf " ++ filename:join([CWD, DataDir])),
|
rmdir:path(filename:join([CWD, DataDir])), %?cmd("rm -rf " ++ filename:join([CWD, DataDir])),
|
||||||
|
|
Loading…
Reference in a new issue