Ensure that on EAGAIN we continue to try to spawn a worker. When workers finish with a queue have them migrate to the other queues looking for work.

This commit is contained in:
Gregory Burd 2013-07-25 13:29:16 -04:00
parent 122963133a
commit 3627ff8690
3 changed files with 42 additions and 23 deletions

View file

@ -37,6 +37,7 @@ extern "C" {
#define ASYNC_NIF_WORKER_QUEUE_SIZE 100 #define ASYNC_NIF_WORKER_QUEUE_SIZE 100
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
struct async_nif_req_entry { struct async_nif_req_entry {
ERL_NIF_TERM ref; ERL_NIF_TERM ref;
ErlNifEnv *env; ErlNifEnv *env;
@ -53,6 +54,7 @@ struct async_nif_work_queue {
unsigned int depth; unsigned int depth;
ErlNifMutex *reqs_mutex; ErlNifMutex *reqs_mutex;
ErlNifCond *reqs_cnd; ErlNifCond *reqs_cnd;
struct async_nif_work_queue *next;
STAILQ_HEAD(reqs, async_nif_req_entry) reqs; STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
}; };
@ -148,14 +150,14 @@ struct async_nif_state {
#define ASYNC_NIF_LOAD(name, priv) do { \ #define ASYNC_NIF_LOAD(name, priv) do { \
if (!name##_async_nif_coord) \ if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create(NULL); \ name##_async_nif_coord = enif_mutex_create("nif_coord load"); \
enif_mutex_lock(name##_async_nif_coord); \ enif_mutex_lock(name##_async_nif_coord); \
priv = async_nif_load(); \ priv = async_nif_load(); \
enif_mutex_unlock(name##_async_nif_coord); \ enif_mutex_unlock(name##_async_nif_coord); \
} while(0); } while(0);
#define ASYNC_NIF_UNLOAD(name, env, priv) do { \ #define ASYNC_NIF_UNLOAD(name, env, priv) do { \
if (!name##_async_nif_coord) \ if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create(NULL); \ name##_async_nif_coord = enif_mutex_create("nif_coord unload"); \
enif_mutex_lock(name##_async_nif_coord); \ enif_mutex_lock(name##_async_nif_coord); \
async_nif_unload(env, priv); \ async_nif_unload(env, priv); \
enif_mutex_unlock(name##_async_nif_coord); \ enif_mutex_unlock(name##_async_nif_coord); \
@ -164,7 +166,7 @@ struct async_nif_state {
} while(0); } while(0);
#define ASYNC_NIF_UPGRADE(name, env) do { \ #define ASYNC_NIF_UPGRADE(name, env) do { \
if (!name##_async_nif_coord) \ if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create(NULL); \ name##_async_nif_coord = enif_mutex_create("nif_coord upgrade"); \
enif_mutex_lock(name##_async_nif_coord); \ enif_mutex_lock(name##_async_nif_coord); \
async_nif_upgrade(env); \ async_nif_upgrade(env); \
enif_mutex_unlock(name##_async_nif_coord); \ enif_mutex_unlock(name##_async_nif_coord); \
@ -197,7 +199,7 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
env = enif_alloc_env(); env = enif_alloc_env();
if (env) { if (env) {
req->env = env; req->env = env;
async_nif->num_reqs++; __sync_fetch_and_add(&async_nif->num_reqs, 1);
} else { } else {
enif_free(req); enif_free(req);
req = NULL; req = NULL;
@ -287,7 +289,7 @@ static ERL_NIF_TERM
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint) async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint)
{ {
/* Identify the most appropriate worker for this request. */ /* Identify the most appropriate worker for this request. */
unsigned int i, qid = 0; unsigned int i, last_qid, qid = 0;
struct async_nif_work_queue *q = NULL; struct async_nif_work_queue *q = NULL;
double avg_depth; double avg_depth;
@ -297,9 +299,10 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
if (hint >= 0) { if (hint >= 0) {
qid = (unsigned int)hint; qid = (unsigned int)hint;
} else { } else {
qid = async_nif->next_q; do {
qid = (qid + 1) % async_nif->num_queues; last_qid = __sync_fetch_and_add(&async_nif->next_q, 0);
async_nif->next_q = qid; qid = (last_qid + 1) % async_nif->num_queues;
} while (!__sync_bool_compare_and_swap(&async_nif->next_q, last_qid, qid));
} }
/* Now we inspect and interate across the set of queues trying to select one /* Now we inspect and interate across the set of queues trying to select one
@ -343,12 +346,19 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
/* Add the request to the queue. */ /* Add the request to the queue. */
STAILQ_INSERT_TAIL(&q->reqs, req, entries); STAILQ_INSERT_TAIL(&q->reqs, req, entries);
q->depth++; __sync_fetch_and_add(&q->depth, 1);
/* We've selected a queue for this new request now check to make sure there are /* We've selected a queue for this new request now check to make sure there are
enough workers actively processing requests on this queue. */ enough workers actively processing requests on this queue. */
if (q->depth > q->num_workers || q->num_workers == 0) retry:
if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++; if (q->depth > q->num_workers || q->num_workers == 0) {
switch(async_nif_start_worker(async_nif, q)) {
case 0: __sync_fetch_and_add(&q->num_workers, 1); break;
case EAGAIN: goto retry;
case EINVAL: case ENOMEM: default: return 0;
}
}
/* Build the term before releasing the lock so as not to race on the use of /* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid in another thread the req pointer (which will soon become invalid in another thread
@ -373,6 +383,7 @@ async_nif_worker_fn(void *arg)
struct async_nif_state *async_nif = we->async_nif; struct async_nif_state *async_nif = we->async_nif;
struct async_nif_work_queue *q = we->q; struct async_nif_work_queue *q = we->q;
struct async_nif_req_entry *req = NULL; struct async_nif_req_entry *req = NULL;
unsigned int tries = async_nif->num_queues;
for(;;) { for(;;) {
/* Examine the request queue, are there things to be done? */ /* Examine the request queue, are there things to be done? */
@ -386,7 +397,14 @@ async_nif_worker_fn(void *arg)
/* Queue is empty so we wait for more work to arrive. */ /* Queue is empty so we wait for more work to arrive. */
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) { if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
enif_mutex_unlock(q->reqs_mutex); enif_mutex_unlock(q->reqs_mutex);
break; if (tries == 0 && q == we->q) break; // we've tried all queues, thread exit
else {
tries--;
__sync_fetch_and_add(&q->num_workers, -1);
q = q->next;
__sync_fetch_and_add(&q->num_workers, 1);
continue; // try another queue
}
} else { } else {
enif_cond_wait(q->reqs_cnd, q->reqs_mutex); enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work; goto check_again_for_work;
@ -396,7 +414,7 @@ async_nif_worker_fn(void *arg)
reqs_mutex lock. Take the request off the queue. */ reqs_mutex lock. Take the request off the queue. */
req = STAILQ_FIRST(&q->reqs); req = STAILQ_FIRST(&q->reqs);
STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries); STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries);
q->depth--; __sync_fetch_and_add(&q->depth, -1);
/* Ensure that there is at least one other worker thread watching this /* Ensure that there is at least one other worker thread watching this
queue. */ queue. */
@ -422,7 +440,7 @@ async_nif_worker_fn(void *arg)
enif_mutex_lock(async_nif->we_mutex); enif_mutex_lock(async_nif->we_mutex);
SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries); SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries);
enif_mutex_unlock(async_nif->we_mutex); enif_mutex_unlock(async_nif->we_mutex);
q->num_workers--; __sync_fetch_and_add(&q->num_workers, -1);
enif_thread_exit(0); enif_thread_exit(0);
return 0; return 0;
} }
@ -554,15 +572,16 @@ async_nif_load()
async_nif->next_q = 0; async_nif->next_q = 0;
async_nif->shutdown = 0; async_nif->shutdown = 0;
STAILQ_INIT(&async_nif->recycled_reqs); STAILQ_INIT(&async_nif->recycled_reqs);
async_nif->recycled_req_mutex = enif_mutex_create(NULL); async_nif->recycled_req_mutex = enif_mutex_create("recycled_req");
async_nif->we_mutex = enif_mutex_create(NULL); async_nif->we_mutex = enif_mutex_create("we");
SLIST_INIT(&async_nif->we_joining); SLIST_INIT(&async_nif->we_joining);
for (i = 0; i < async_nif->num_queues; i++) { for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i]; struct async_nif_work_queue *q = &async_nif->queues[i];
STAILQ_INIT(&q->reqs); STAILQ_INIT(&q->reqs);
q->reqs_mutex = enif_mutex_create(NULL); q->reqs_mutex = enif_mutex_create("reqs");
q->reqs_cnd = enif_cond_create(NULL); q->reqs_cnd = enif_cond_create("reqs");
q->next = &async_nif->queues[(i + 1) % num_queues];
} }
return async_nif; return async_nif;
} }

View file

@ -58,7 +58,7 @@ get_wt ()
wt_configure () wt_configure ()
{ {
(cd $BASEDIR/$WT_DIR/build_posix (cd $BASEDIR/$WT_DIR/build_posix
../configure --with-pic \ CFLAGS+=-g ../configure --with-pic \
--enable-snappy \ --enable-snappy \
--prefix=${BASEDIR}/system || exit 1) --prefix=${BASEDIR}/system || exit 1)
} }

View file

@ -648,7 +648,7 @@ ASYNC_NIF_DECL(
} else { } else {
conn_handle->session_config = NULL; conn_handle->session_config = NULL;
} }
conn_handle->cache_mutex = enif_mutex_create(NULL); conn_handle->cache_mutex = enif_mutex_create("conn_handle");
enif_mutex_lock(conn_handle->cache_mutex); enif_mutex_lock(conn_handle->cache_mutex);
conn_handle->conn = conn; conn_handle->conn = conn;
ERL_NIF_TERM result = enif_make_resource(env, conn_handle); ERL_NIF_TERM result = enif_make_resource(env, conn_handle);
@ -2259,9 +2259,9 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
memset(priv, 0, sizeof(struct wterl_priv_data)); memset(priv, 0, sizeof(struct wterl_priv_data));
struct wterl_event_handlers *eh = &priv->eh; struct wterl_event_handlers *eh = &priv->eh;
eh->error_mutex = enif_mutex_create(NULL); eh->error_mutex = enif_mutex_create("error_mutex");
eh->message_mutex = enif_mutex_create(NULL); eh->message_mutex = enif_mutex_create("message_mutex");
eh->progress_mutex = enif_mutex_create(NULL); eh->progress_mutex = enif_mutex_create("progress_mutex");
/* Process the load_info array of tuples, we expect: /* Process the load_info array of tuples, we expect:
[{wterl_vsn, "a version string"}, [{wterl_vsn, "a version string"},