WIP: remove potential for infinite loops with CAS and fix a few issues in async

This commit is contained in:
Gregory Burd 2013-06-12 08:09:51 -04:00
parent 110b482962
commit 4460434db1
3 changed files with 33 additions and 18 deletions

View file

@ -34,9 +34,9 @@ extern "C" {
#define UNUSED(v) ((void)(v))
#endif
#define ASYNC_NIF_MAX_WORKERS 512
#define ASYNC_NIF_MAX_WORKERS 1024
#define ASYNC_NIF_WORKER_QUEUE_SIZE 500
#define ASYNC_NIF_MAX_QUEUED_REQS 1000 * ASYNC_NIF_MAX_WORKERS
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
STAT_DECL(qwait, 1000);
@ -104,12 +104,12 @@ struct async_nif_state {
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \
req = async_nif_reuse_req(async_nif); \
new_env = req->env; \
if (!req) { \
async_nif_recycle_req(req, async_nif); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "eagain")); \
} \
new_env = req->env; \
do pre_block while(0); \
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \
if (!copy_of_args) { \
@ -267,16 +267,16 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
enif_mutex_unlock(q->reqs_mutex);
return 0;
}
if (fifo_q_size(reqs, q->reqs) > async_nif->num_queues) {
if (!fifo_q_full(reqs, q->reqs)) {
double await = STAT_MEAN_LOG2_SAMPLE(async_nif, qwait);
double await_inthisq = STAT_MEAN_LOG2_SAMPLE(q, qwait);
if (fifo_q_full(reqs, q->reqs) || await_inthisq > await) {
if (await_inthisq > await) {
enif_mutex_unlock(q->reqs_mutex);
qid = (qid + 1) % async_nif->num_queues;
q = &async_nif->queues[qid];
} else {
break;
}
} else {
break;
}
// TODO: at some point add in work sheading/stealing
} while(n-- > 0);
@ -467,7 +467,7 @@ async_nif_load()
sizeof(struct async_nif_work_queue) * num_queues);
async_nif->num_queues = num_queues;
async_nif->num_workers = 2 * num_queues;
async_nif->num_workers = ASYNC_NIF_MAX_WORKERS;
async_nif->next_q = 0;
async_nif->shutdown = 0;
async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS);

View file

@ -33,7 +33,7 @@ extern "C" {
#include <stdarg.h>
#define DPRINTF(fmt, ...) \
do { \
fprintf(stderr, "%s:%d " fmt "\n", __func__, __LINE__, __VA_ARGS__); \
fprintf(stderr, "%s:%d (%s) " fmt "\n", __FILE__, __LINE__, __func__, __VA_ARGS__); \
fflush(stderr); \
} while(0)
#define DPUTS(arg) DPRINTF("%s", arg)

View file

@ -336,7 +336,7 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
struct wterl_ctx **ctx,
int count, const char *session_config, ...)
{
int i = 0;
int i = 3;
va_list ap;
uint64_t sig;
const char *arg;
@ -347,33 +347,37 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
sig = __ctx_cache_sig(session_config, ap, count);
va_end(ap);
*ctx = NULL;
do {
c = conn_handle->mru_ctx[worker_id];
if (CASPO(&conn_handle->mru_ctx[worker_id], c, NULL) == c) {
if (c == NULL) {
// mru miss:
DPRINTF("[%.4u] mru miss: %llu != NULL", worker_id, PRIuint64(sig));
*ctx = NULL;
} else {
if (c->sig == sig) {
// mru hit:
DPRINTF("[%.4u] mru hit: %llu", worker_id, PRIuint64(sig));
*ctx = c;
break;
} else {
// mru missmatch:
DPRINTF("[%.4u] mru missmatch: %llu != %llu", worker_id, PRIuint64(sig), PRIuint64(c->sig));
__ctx_cache_add(conn_handle, c);
*ctx = NULL;
}
}
} else {
// CAS failed, retry...
continue;
}
} while(0);
// CAS failed, retry up to 3 times
} while(i--);
if (*ctx == NULL) {
// check the cache
(*ctx) = __ctx_cache_find(conn_handle, sig);
if ((*ctx) == NULL) {
// cache miss:
DPRINTF("[%.4u] cache miss: %llu [%d]", worker_id, PRIuint64(sig), conn_handle->cache_size);
WT_CONNECTION *conn = conn_handle->conn;
WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, session_config, &session);
@ -402,7 +406,10 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
}
}
va_end(ap);
} // else { cache hit }
} else {
// cache hit:
DPRINTF("[%.4u] cache hit: %llu [%d]", worker_id, PRIuint64(sig), conn_handle->cache_size);
}
}
return 0;
}
@ -423,9 +430,14 @@ __release_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx
cursor->reset(cursor);
}
do {
c = conn_handle->mru_ctx[worker_id];
} while(CASPO(&conn_handle->mru_ctx[worker_id], c, ctx) != c);
c = conn_handle->mru_ctx[worker_id];
if (CASPO(&conn_handle->mru_ctx[worker_id], c, ctx) != c) {
__ctx_cache_add(conn_handle, ctx);
} else {
if (c != NULL) {
__ctx_cache_add(conn_handle, c);
}
}
}
/**
@ -1462,15 +1474,18 @@ ASYNC_NIF_DECL(
cursor->set_key(cursor, &item_key);
rc = cursor->search(cursor);
if (rc != 0) {
__release_ctx(args->conn_handle, worker_id, ctx);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
rc = cursor->get_value(cursor, &item_value);
if (rc != 0) {
__release_ctx(args->conn_handle, worker_id, ctx);
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
ERL_NIF_TERM value;
unsigned char *bin = enif_make_new_binary(env, item_value.size, &value);
memcpy(bin, item_value.data, item_value.size);