From 3627ff86904ac8fa0768cf6bcad4148fd12f33cf Mon Sep 17 00:00:00 2001 From: Gregory Burd Date: Thu, 25 Jul 2013 13:29:16 -0400 Subject: [PATCH] Ensure that on EAGAIN we continue to try to spawn a worker. When workers finish with a queue have them migrate to the other queues looking for work. --- c_src/async_nif.h | 55 ++++++++++++++++++++++++++++++--------------- c_src/build_deps.sh | 2 +- c_src/wterl.c | 8 +++---- 3 files changed, 42 insertions(+), 23 deletions(-) diff --git a/c_src/async_nif.h b/c_src/async_nif.h index b62f9f2..277b3e0 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -37,6 +37,7 @@ extern "C" { #define ASYNC_NIF_WORKER_QUEUE_SIZE 100 #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS + struct async_nif_req_entry { ERL_NIF_TERM ref; ErlNifEnv *env; @@ -53,6 +54,7 @@ struct async_nif_work_queue { unsigned int depth; ErlNifMutex *reqs_mutex; ErlNifCond *reqs_cnd; + struct async_nif_work_queue *next; STAILQ_HEAD(reqs, async_nif_req_entry) reqs; }; @@ -148,14 +150,14 @@ struct async_nif_state { #define ASYNC_NIF_LOAD(name, priv) do { \ if (!name##_async_nif_coord) \ - name##_async_nif_coord = enif_mutex_create(NULL); \ + name##_async_nif_coord = enif_mutex_create("nif_coord load"); \ enif_mutex_lock(name##_async_nif_coord); \ priv = async_nif_load(); \ enif_mutex_unlock(name##_async_nif_coord); \ } while(0); #define ASYNC_NIF_UNLOAD(name, env, priv) do { \ if (!name##_async_nif_coord) \ - name##_async_nif_coord = enif_mutex_create(NULL); \ + name##_async_nif_coord = enif_mutex_create("nif_coord unload"); \ enif_mutex_lock(name##_async_nif_coord); \ async_nif_unload(env, priv); \ enif_mutex_unlock(name##_async_nif_coord); \ @@ -164,7 +166,7 @@ struct async_nif_state { } while(0); #define ASYNC_NIF_UPGRADE(name, env) do { \ if (!name##_async_nif_coord) \ - name##_async_nif_coord = enif_mutex_create(NULL); \ + name##_async_nif_coord = enif_mutex_create("nif_coord upgrade"); \ enif_mutex_lock(name##_async_nif_coord); \ async_nif_upgrade(env); \ enif_mutex_unlock(name##_async_nif_coord); \ @@ -197,7 +199,7 @@ async_nif_reuse_req(struct async_nif_state *async_nif) env = enif_alloc_env(); if (env) { req->env = env; - async_nif->num_reqs++; + __sync_fetch_and_add(&async_nif->num_reqs, 1); } else { enif_free(req); req = NULL; @@ -287,7 +289,7 @@ static ERL_NIF_TERM async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint) { /* Identify the most appropriate worker for this request. */ - unsigned int i, qid = 0; + unsigned int i, last_qid, qid = 0; struct async_nif_work_queue *q = NULL; double avg_depth; @@ -297,9 +299,10 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en if (hint >= 0) { qid = (unsigned int)hint; } else { - qid = async_nif->next_q; - qid = (qid + 1) % async_nif->num_queues; - async_nif->next_q = qid; + do { + last_qid = __sync_fetch_and_add(&async_nif->next_q, 0); + qid = (last_qid + 1) % async_nif->num_queues; + } while (!__sync_bool_compare_and_swap(&async_nif->next_q, last_qid, qid)); } /* Now we inspect and interate across the set of queues trying to select one @@ -343,12 +346,19 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en /* Add the request to the queue. */ STAILQ_INSERT_TAIL(&q->reqs, req, entries); - q->depth++; + __sync_fetch_and_add(&q->depth, 1); /* We've selected a queue for this new request now check to make sure there are enough workers actively processing requests on this queue. */ - if (q->depth > q->num_workers || q->num_workers == 0) - if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++; + retry: + if (q->depth > q->num_workers || q->num_workers == 0) { + switch(async_nif_start_worker(async_nif, q)) { + case 0: __sync_fetch_and_add(&q->num_workers, 1); break; + case EAGAIN: goto retry; + case EINVAL: case ENOMEM: default: return 0; + } + } + /* Build the term before releasing the lock so as not to race on the use of the req pointer (which will soon become invalid in another thread @@ -373,6 +383,7 @@ async_nif_worker_fn(void *arg) struct async_nif_state *async_nif = we->async_nif; struct async_nif_work_queue *q = we->q; struct async_nif_req_entry *req = NULL; + unsigned int tries = async_nif->num_queues; for(;;) { /* Examine the request queue, are there things to be done? */ @@ -386,7 +397,14 @@ async_nif_worker_fn(void *arg) /* Queue is empty so we wait for more work to arrive. */ if (q->num_workers > ASYNC_NIF_MIN_WORKERS) { enif_mutex_unlock(q->reqs_mutex); - break; + if (tries == 0 && q == we->q) break; // we've tried all queues, thread exit + else { + tries--; + __sync_fetch_and_add(&q->num_workers, -1); + q = q->next; + __sync_fetch_and_add(&q->num_workers, 1); + continue; // try another queue + } } else { enif_cond_wait(q->reqs_cnd, q->reqs_mutex); goto check_again_for_work; @@ -396,7 +414,7 @@ async_nif_worker_fn(void *arg) reqs_mutex lock. Take the request off the queue. */ req = STAILQ_FIRST(&q->reqs); STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries); - q->depth--; + __sync_fetch_and_add(&q->depth, -1); /* Ensure that there is at least one other worker thread watching this queue. */ @@ -422,7 +440,7 @@ async_nif_worker_fn(void *arg) enif_mutex_lock(async_nif->we_mutex); SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries); enif_mutex_unlock(async_nif->we_mutex); - q->num_workers--; + __sync_fetch_and_add(&q->num_workers, -1); enif_thread_exit(0); return 0; } @@ -554,15 +572,16 @@ async_nif_load() async_nif->next_q = 0; async_nif->shutdown = 0; STAILQ_INIT(&async_nif->recycled_reqs); - async_nif->recycled_req_mutex = enif_mutex_create(NULL); - async_nif->we_mutex = enif_mutex_create(NULL); + async_nif->recycled_req_mutex = enif_mutex_create("recycled_req"); + async_nif->we_mutex = enif_mutex_create("we"); SLIST_INIT(&async_nif->we_joining); for (i = 0; i < async_nif->num_queues; i++) { struct async_nif_work_queue *q = &async_nif->queues[i]; STAILQ_INIT(&q->reqs); - q->reqs_mutex = enif_mutex_create(NULL); - q->reqs_cnd = enif_cond_create(NULL); + q->reqs_mutex = enif_mutex_create("reqs"); + q->reqs_cnd = enif_cond_create("reqs"); + q->next = &async_nif->queues[(i + 1) % num_queues]; } return async_nif; } diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index 4294d63..e81fd5e 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -58,7 +58,7 @@ get_wt () wt_configure () { (cd $BASEDIR/$WT_DIR/build_posix - ../configure --with-pic \ + CFLAGS+=-g ../configure --with-pic \ --enable-snappy \ --prefix=${BASEDIR}/system || exit 1) } diff --git a/c_src/wterl.c b/c_src/wterl.c index 8263f40..4ebbbf1 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -648,7 +648,7 @@ ASYNC_NIF_DECL( } else { conn_handle->session_config = NULL; } - conn_handle->cache_mutex = enif_mutex_create(NULL); + conn_handle->cache_mutex = enif_mutex_create("conn_handle"); enif_mutex_lock(conn_handle->cache_mutex); conn_handle->conn = conn; ERL_NIF_TERM result = enif_make_resource(env, conn_handle); @@ -2259,9 +2259,9 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) memset(priv, 0, sizeof(struct wterl_priv_data)); struct wterl_event_handlers *eh = &priv->eh; - eh->error_mutex = enif_mutex_create(NULL); - eh->message_mutex = enif_mutex_create(NULL); - eh->progress_mutex = enif_mutex_create(NULL); + eh->error_mutex = enif_mutex_create("error_mutex"); + eh->message_mutex = enif_mutex_create("message_mutex"); + eh->progress_mutex = enif_mutex_create("progress_mutex"); /* Process the load_info array of tuples, we expect: [{wterl_vsn, "a version string"},