Ensure that we init the shared cache when asking for a cursor before
asking for a session.
This commit is contained in:
parent
846f7f72ba
commit
728d2281e0
1 changed files with 43 additions and 18 deletions
|
@ -104,6 +104,32 @@ struct wterl_priv_data {
|
||||||
ASYNC_NIF_INIT(wterl);
|
ASYNC_NIF_INIT(wterl);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
__create_new_session_cache(WterlConnHandle *conn_handle, WterlCtx *ctx)
|
||||||
|
{
|
||||||
|
/* Create a context for this worker thread to reuse. */
|
||||||
|
enif_mutex_lock(conn_handle->contexts_mutex);
|
||||||
|
if (ctx->session != NULL) {
|
||||||
|
enif_mutex_unlock(conn_handle->contexts_mutex);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
WT_CONNECTION *conn = conn_handle->conn;
|
||||||
|
int rc = conn->open_session(conn, NULL, conn_handle->session_config, &ctx->session);
|
||||||
|
if (rc != 0) {
|
||||||
|
enif_mutex_unlock(conn_handle->contexts_mutex);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
ctx->cursors = kh_init(cursors);
|
||||||
|
conn_handle->num_contexts++;
|
||||||
|
enif_mutex_unlock(conn_handle->contexts_mutex);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the per-worker reusable WT_SESSION for a worker_id.
|
* Get the per-worker reusable WT_SESSION for a worker_id.
|
||||||
*/
|
*/
|
||||||
|
@ -111,23 +137,14 @@ static int
|
||||||
__session_for(WterlConnHandle *conn_handle, unsigned int worker_id, WT_SESSION **session)
|
__session_for(WterlConnHandle *conn_handle, unsigned int worker_id, WT_SESSION **session)
|
||||||
{
|
{
|
||||||
WterlCtx *ctx = &conn_handle->contexts[worker_id];
|
WterlCtx *ctx = &conn_handle->contexts[worker_id];
|
||||||
|
int rc = 0;
|
||||||
|
|
||||||
|
if (ctx->session == NULL)
|
||||||
|
rc = __create_new_session_cache(conn_handle, ctx);
|
||||||
|
|
||||||
*session = ctx->session;
|
*session = ctx->session;
|
||||||
if (*session == NULL) {
|
|
||||||
/* Create a context for this worker thread to reuse. */
|
|
||||||
enif_mutex_lock(conn_handle->contexts_mutex);
|
|
||||||
WT_CONNECTION *conn = conn_handle->conn;
|
|
||||||
int rc = conn->open_session(conn, NULL, conn_handle->session_config, session);
|
|
||||||
if (rc != 0) {
|
|
||||||
enif_mutex_unlock(conn_handle->contexts_mutex);
|
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
ctx->session = *session;
|
|
||||||
ctx->cursors = kh_init(cursors);
|
|
||||||
conn_handle->num_contexts++;
|
|
||||||
enif_mutex_unlock(conn_handle->contexts_mutex);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Close all sessions and all cursors open on any objects.
|
* Close all sessions and all cursors open on any objects.
|
||||||
|
@ -197,8 +214,15 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char
|
||||||
{
|
{
|
||||||
/* Check to see if we have a cursor open for this uri and if so reuse it. */
|
/* Check to see if we have a cursor open for this uri and if so reuse it. */
|
||||||
WterlCtx *ctx = &conn_handle->contexts[worker_id];
|
WterlCtx *ctx = &conn_handle->contexts[worker_id];
|
||||||
khash_t(cursors) *h = ctx->cursors;
|
khash_t(cursors) *h = NULL;
|
||||||
khiter_t itr = kh_get(cursors, h, (char *)uri);
|
khiter_t itr;
|
||||||
|
int rc;
|
||||||
|
|
||||||
|
if (ctx->session == NULL)
|
||||||
|
__create_new_session_cache(conn_handle, ctx); // TODO: check return value
|
||||||
|
|
||||||
|
h = ctx->cursors;
|
||||||
|
itr = kh_get(cursors, h, (char *)uri);
|
||||||
if (itr != kh_end(h)) {
|
if (itr != kh_end(h)) {
|
||||||
// key exists in hash table, retrieve it
|
// key exists in hash table, retrieve it
|
||||||
*cursor = (WT_CURSOR*)kh_value(h, itr);
|
*cursor = (WT_CURSOR*)kh_value(h, itr);
|
||||||
|
@ -206,7 +230,7 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char
|
||||||
// key does not exist in hash table, create and insert one
|
// key does not exist in hash table, create and insert one
|
||||||
enif_mutex_lock(conn_handle->contexts_mutex);
|
enif_mutex_lock(conn_handle->contexts_mutex);
|
||||||
WT_SESSION *session = conn_handle->contexts[worker_id].session;
|
WT_SESSION *session = conn_handle->contexts[worker_id].session;
|
||||||
int rc = session->open_cursor(session, uri, NULL, "overwrite,raw", cursor);
|
rc = session->open_cursor(session, uri, NULL, "overwrite,raw", cursor);
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
enif_mutex_unlock(conn_handle->contexts_mutex);
|
enif_mutex_unlock(conn_handle->contexts_mutex);
|
||||||
return rc;
|
return rc;
|
||||||
|
@ -221,6 +245,7 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char
|
||||||
memcpy(key, uri, 128);
|
memcpy(key, uri, 128);
|
||||||
int itr_status;
|
int itr_status;
|
||||||
itr = kh_put(cursors, h, key, &itr_status);
|
itr = kh_put(cursors, h, key, &itr_status);
|
||||||
|
//assert(itr != kh_end());
|
||||||
kh_value(h, itr) = *cursor;
|
kh_value(h, itr) = *cursor;
|
||||||
enif_mutex_unlock(conn_handle->contexts_mutex);
|
enif_mutex_unlock(conn_handle->contexts_mutex);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue