Merge pull request #10 from basho-labs/gsb-workers-migrate

Worker threads should check for work in other queues before exiting.
This commit is contained in:
Gregory Burd 2013-07-26 17:12:15 -07:00
commit 8f415df69c
3 changed files with 33 additions and 22 deletions

View file

@ -53,6 +53,7 @@ struct async_nif_work_queue {
unsigned int depth; unsigned int depth;
ErlNifMutex *reqs_mutex; ErlNifMutex *reqs_mutex;
ErlNifCond *reqs_cnd; ErlNifCond *reqs_cnd;
struct async_nif_work_queue *next;
STAILQ_HEAD(reqs, async_nif_req_entry) reqs; STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
}; };
@ -148,14 +149,14 @@ struct async_nif_state {
#define ASYNC_NIF_LOAD(name, priv) do { \ #define ASYNC_NIF_LOAD(name, priv) do { \
if (!name##_async_nif_coord) \ if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create(NULL); \ name##_async_nif_coord = enif_mutex_create("nif_coord load"); \
enif_mutex_lock(name##_async_nif_coord); \ enif_mutex_lock(name##_async_nif_coord); \
priv = async_nif_load(); \ priv = async_nif_load(); \
enif_mutex_unlock(name##_async_nif_coord); \ enif_mutex_unlock(name##_async_nif_coord); \
} while(0); } while(0);
#define ASYNC_NIF_UNLOAD(name, env, priv) do { \ #define ASYNC_NIF_UNLOAD(name, env, priv) do { \
if (!name##_async_nif_coord) \ if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create(NULL); \ name##_async_nif_coord = enif_mutex_create("nif_coord unload"); \
enif_mutex_lock(name##_async_nif_coord); \ enif_mutex_lock(name##_async_nif_coord); \
async_nif_unload(env, priv); \ async_nif_unload(env, priv); \
enif_mutex_unlock(name##_async_nif_coord); \ enif_mutex_unlock(name##_async_nif_coord); \
@ -164,7 +165,7 @@ struct async_nif_state {
} while(0); } while(0);
#define ASYNC_NIF_UPGRADE(name, env) do { \ #define ASYNC_NIF_UPGRADE(name, env) do { \
if (!name##_async_nif_coord) \ if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create(NULL); \ name##_async_nif_coord = enif_mutex_create("nif_coord upgrade"); \
enif_mutex_lock(name##_async_nif_coord); \ enif_mutex_lock(name##_async_nif_coord); \
async_nif_upgrade(env); \ async_nif_upgrade(env); \
enif_mutex_unlock(name##_async_nif_coord); \ enif_mutex_unlock(name##_async_nif_coord); \
@ -197,7 +198,7 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
env = enif_alloc_env(); env = enif_alloc_env();
if (env) { if (env) {
req->env = env; req->env = env;
async_nif->num_reqs++; __sync_fetch_and_add(&async_nif->num_reqs, 1);
} else { } else {
enif_free(req); enif_free(req);
req = NULL; req = NULL;
@ -287,7 +288,7 @@ static ERL_NIF_TERM
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint) async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint)
{ {
/* Identify the most appropriate worker for this request. */ /* Identify the most appropriate worker for this request. */
unsigned int i, qid = 0; unsigned int i, last_qid, qid = 0;
struct async_nif_work_queue *q = NULL; struct async_nif_work_queue *q = NULL;
double avg_depth = 0.0; double avg_depth = 0.0;
@ -297,9 +298,10 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
if (hint >= 0) { if (hint >= 0) {
qid = (unsigned int)hint; qid = (unsigned int)hint;
} else { } else {
qid = async_nif->next_q; do {
qid = (qid + 1) % async_nif->num_queues; last_qid = __sync_fetch_and_add(&async_nif->next_q, 0);
async_nif->next_q = qid; qid = (last_qid + 1) % async_nif->num_queues;
} while (!__sync_bool_compare_and_swap(&async_nif->next_q, last_qid, qid));
} }
/* Now we inspect and interate across the set of queues trying to select one /* Now we inspect and interate across the set of queues trying to select one
@ -342,7 +344,7 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
/* Add the request to the queue. */ /* Add the request to the queue. */
STAILQ_INSERT_TAIL(&q->reqs, req, entries); STAILQ_INSERT_TAIL(&q->reqs, req, entries);
q->depth++; __sync_fetch_and_add(&q->depth, 1);
/* We've selected a queue for this new request now check to make sure there are /* We've selected a queue for this new request now check to make sure there are
enough workers actively processing requests on this queue. */ enough workers actively processing requests on this queue. */
@ -350,7 +352,7 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
switch(async_nif_start_worker(async_nif, q)) { switch(async_nif_start_worker(async_nif, q)) {
case EINVAL: case ENOMEM: default: return 0; case EINVAL: case ENOMEM: default: return 0;
case EAGAIN: continue; case EAGAIN: continue;
case 0: q->num_workers++; goto done; case 0: __sync_fetch_and_add(&q->num_workers, 1); goto done;
} }
}done:; }done:;
@ -377,6 +379,7 @@ async_nif_worker_fn(void *arg)
struct async_nif_state *async_nif = we->async_nif; struct async_nif_state *async_nif = we->async_nif;
struct async_nif_work_queue *q = we->q; struct async_nif_work_queue *q = we->q;
struct async_nif_req_entry *req = NULL; struct async_nif_req_entry *req = NULL;
unsigned int tries = async_nif->num_queues;
for(;;) { for(;;) {
/* Examine the request queue, are there things to be done? */ /* Examine the request queue, are there things to be done? */
@ -390,7 +393,14 @@ async_nif_worker_fn(void *arg)
/* Queue is empty so we wait for more work to arrive. */ /* Queue is empty so we wait for more work to arrive. */
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) { if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
enif_mutex_unlock(q->reqs_mutex); enif_mutex_unlock(q->reqs_mutex);
break; if (tries == 0 && q == we->q) break; // we've tried all queues, thread exit
else {
tries--;
__sync_fetch_and_add(&q->num_workers, -1);
q = q->next;
__sync_fetch_and_add(&q->num_workers, 1);
continue; // try another queue
}
} else { } else {
enif_cond_wait(q->reqs_cnd, q->reqs_mutex); enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work; goto check_again_for_work;
@ -400,7 +410,7 @@ async_nif_worker_fn(void *arg)
reqs_mutex lock. Take the request off the queue. */ reqs_mutex lock. Take the request off the queue. */
req = STAILQ_FIRST(&q->reqs); req = STAILQ_FIRST(&q->reqs);
STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries); STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries);
q->depth--; __sync_fetch_and_add(&q->depth, -1);
/* Ensure that there is at least one other worker thread watching this /* Ensure that there is at least one other worker thread watching this
queue. */ queue. */
@ -426,7 +436,7 @@ async_nif_worker_fn(void *arg)
enif_mutex_lock(async_nif->we_mutex); enif_mutex_lock(async_nif->we_mutex);
SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries); SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries);
enif_mutex_unlock(async_nif->we_mutex); enif_mutex_unlock(async_nif->we_mutex);
q->num_workers--; __sync_fetch_and_add(&q->num_workers, -1);
enif_thread_exit(0); enif_thread_exit(0);
return 0; return 0;
} }
@ -558,15 +568,16 @@ async_nif_load()
async_nif->next_q = 0; async_nif->next_q = 0;
async_nif->shutdown = 0; async_nif->shutdown = 0;
STAILQ_INIT(&async_nif->recycled_reqs); STAILQ_INIT(&async_nif->recycled_reqs);
async_nif->recycled_req_mutex = enif_mutex_create(NULL); async_nif->recycled_req_mutex = enif_mutex_create("recycled_req");
async_nif->we_mutex = enif_mutex_create(NULL); async_nif->we_mutex = enif_mutex_create("we");
SLIST_INIT(&async_nif->we_joining); SLIST_INIT(&async_nif->we_joining);
for (i = 0; i < async_nif->num_queues; i++) { for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i]; struct async_nif_work_queue *q = &async_nif->queues[i];
STAILQ_INIT(&q->reqs); STAILQ_INIT(&q->reqs);
q->reqs_mutex = enif_mutex_create(NULL); q->reqs_mutex = enif_mutex_create("reqs");
q->reqs_cnd = enif_cond_create(NULL); q->reqs_cnd = enif_cond_create("reqs");
q->next = &async_nif->queues[(i + 1) % num_queues];
} }
return async_nif; return async_nif;
} }

View file

@ -58,7 +58,7 @@ get_wt ()
wt_configure () wt_configure ()
{ {
(cd $BASEDIR/$WT_DIR/build_posix (cd $BASEDIR/$WT_DIR/build_posix
../configure --with-pic \ CFLAGS+=-g ../configure --with-pic \
--enable-snappy \ --enable-snappy \
--prefix=${BASEDIR}/system || exit 1) --prefix=${BASEDIR}/system || exit 1)
} }

View file

@ -648,7 +648,7 @@ ASYNC_NIF_DECL(
} else { } else {
conn_handle->session_config = NULL; conn_handle->session_config = NULL;
} }
conn_handle->cache_mutex = enif_mutex_create(NULL); conn_handle->cache_mutex = enif_mutex_create("conn_handle");
enif_mutex_lock(conn_handle->cache_mutex); enif_mutex_lock(conn_handle->cache_mutex);
conn_handle->conn = conn; conn_handle->conn = conn;
ERL_NIF_TERM result = enif_make_resource(env, conn_handle); ERL_NIF_TERM result = enif_make_resource(env, conn_handle);
@ -2271,9 +2271,9 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
memset(priv, 0, sizeof(struct wterl_priv_data)); memset(priv, 0, sizeof(struct wterl_priv_data));
struct wterl_event_handlers *eh = &priv->eh; struct wterl_event_handlers *eh = &priv->eh;
eh->error_mutex = enif_mutex_create(NULL); eh->error_mutex = enif_mutex_create("error_mutex");
eh->message_mutex = enif_mutex_create(NULL); eh->message_mutex = enif_mutex_create("message_mutex");
eh->progress_mutex = enif_mutex_create(NULL); eh->progress_mutex = enif_mutex_create("progress_mutex");
/* Process the load_info array of tuples, we expect: /* Process the load_info array of tuples, we expect:
[{wterl_vsn, "a version string"}, [{wterl_vsn, "a version string"},