From 4460434db13319fcb166ceb2b21cdadd714f3a13 Mon Sep 17 00:00:00 2001 From: Gregory Burd Date: Wed, 12 Jun 2013 08:09:51 -0400 Subject: [PATCH] WIP: remove potential for infinite loops with CAS and fix a few issues in async --- c_src/async_nif.h | 16 ++++++++-------- c_src/common.h | 2 +- c_src/wterl.c | 33 ++++++++++++++++++++++++--------- 3 files changed, 33 insertions(+), 18 deletions(-) diff --git a/c_src/async_nif.h b/c_src/async_nif.h index f45e9cc..6fde4bb 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -34,9 +34,9 @@ extern "C" { #define UNUSED(v) ((void)(v)) #endif -#define ASYNC_NIF_MAX_WORKERS 512 +#define ASYNC_NIF_MAX_WORKERS 1024 #define ASYNC_NIF_WORKER_QUEUE_SIZE 500 -#define ASYNC_NIF_MAX_QUEUED_REQS 1000 * ASYNC_NIF_MAX_WORKERS +#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS STAT_DECL(qwait, 1000); @@ -104,12 +104,12 @@ struct async_nif_state { return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "shutdown")); \ req = async_nif_reuse_req(async_nif); \ - new_env = req->env; \ if (!req) { \ async_nif_recycle_req(req, async_nif); \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "eagain")); \ } \ + new_env = req->env; \ do pre_block while(0); \ copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \ if (!copy_of_args) { \ @@ -267,16 +267,16 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en enif_mutex_unlock(q->reqs_mutex); return 0; } - if (fifo_q_size(reqs, q->reqs) > async_nif->num_queues) { + if (!fifo_q_full(reqs, q->reqs)) { double await = STAT_MEAN_LOG2_SAMPLE(async_nif, qwait); double await_inthisq = STAT_MEAN_LOG2_SAMPLE(q, qwait); - if (fifo_q_full(reqs, q->reqs) || await_inthisq > await) { + if (await_inthisq > await) { enif_mutex_unlock(q->reqs_mutex); qid = (qid + 1) % async_nif->num_queues; q = &async_nif->queues[qid]; + } else { + break; } - } else { - break; } // TODO: at some point add in work sheading/stealing } while(n-- > 0); @@ -467,7 +467,7 @@ async_nif_load() sizeof(struct async_nif_work_queue) * num_queues); async_nif->num_queues = num_queues; - async_nif->num_workers = 2 * num_queues; + async_nif->num_workers = ASYNC_NIF_MAX_WORKERS; async_nif->next_q = 0; async_nif->shutdown = 0; async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS); diff --git a/c_src/common.h b/c_src/common.h index 3364573..b8324da 100644 --- a/c_src/common.h +++ b/c_src/common.h @@ -33,7 +33,7 @@ extern "C" { #include #define DPRINTF(fmt, ...) \ do { \ - fprintf(stderr, "%s:%d " fmt "\n", __func__, __LINE__, __VA_ARGS__); \ + fprintf(stderr, "%s:%d (%s) " fmt "\n", __FILE__, __LINE__, __func__, __VA_ARGS__); \ fflush(stderr); \ } while(0) #define DPUTS(arg) DPRINTF("%s", arg) diff --git a/c_src/wterl.c b/c_src/wterl.c index 4c1b02c..9bc16f1 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -336,7 +336,7 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx **ctx, int count, const char *session_config, ...) { - int i = 0; + int i = 3; va_list ap; uint64_t sig; const char *arg; @@ -347,33 +347,37 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, sig = __ctx_cache_sig(session_config, ap, count); va_end(ap); + *ctx = NULL; do { c = conn_handle->mru_ctx[worker_id]; if (CASPO(&conn_handle->mru_ctx[worker_id], c, NULL) == c) { if (c == NULL) { // mru miss: + DPRINTF("[%.4u] mru miss: %llu != NULL", worker_id, PRIuint64(sig)); *ctx = NULL; } else { if (c->sig == sig) { // mru hit: + DPRINTF("[%.4u] mru hit: %llu", worker_id, PRIuint64(sig)); *ctx = c; + break; } else { // mru missmatch: + DPRINTF("[%.4u] mru missmatch: %llu != %llu", worker_id, PRIuint64(sig), PRIuint64(c->sig)); __ctx_cache_add(conn_handle, c); *ctx = NULL; } } - } else { - // CAS failed, retry... - continue; } - } while(0); + // CAS failed, retry up to 3 times + } while(i--); if (*ctx == NULL) { // check the cache (*ctx) = __ctx_cache_find(conn_handle, sig); if ((*ctx) == NULL) { // cache miss: + DPRINTF("[%.4u] cache miss: %llu [%d]", worker_id, PRIuint64(sig), conn_handle->cache_size); WT_CONNECTION *conn = conn_handle->conn; WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, session_config, &session); @@ -402,7 +406,10 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, } } va_end(ap); - } // else { cache hit } + } else { + // cache hit: + DPRINTF("[%.4u] cache hit: %llu [%d]", worker_id, PRIuint64(sig), conn_handle->cache_size); + } } return 0; } @@ -423,9 +430,14 @@ __release_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx cursor->reset(cursor); } - do { - c = conn_handle->mru_ctx[worker_id]; - } while(CASPO(&conn_handle->mru_ctx[worker_id], c, ctx) != c); + c = conn_handle->mru_ctx[worker_id]; + if (CASPO(&conn_handle->mru_ctx[worker_id], c, ctx) != c) { + __ctx_cache_add(conn_handle, ctx); + } else { + if (c != NULL) { + __ctx_cache_add(conn_handle, c); + } + } } /** @@ -1462,15 +1474,18 @@ ASYNC_NIF_DECL( cursor->set_key(cursor, &item_key); rc = cursor->search(cursor); if (rc != 0) { + __release_ctx(args->conn_handle, worker_id, ctx); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } rc = cursor->get_value(cursor, &item_value); if (rc != 0) { + __release_ctx(args->conn_handle, worker_id, ctx); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } + ERL_NIF_TERM value; unsigned char *bin = enif_make_new_binary(env, item_value.size, &value); memcpy(bin, item_value.data, item_value.size);