diff --git a/c_src/async_nif.h b/c_src/async_nif.h index 13bbce7..9f5b94f 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -25,7 +25,6 @@ extern "C" { #endif #include -#include "fifo_q.h" #include "queue.h" #include "stats.h" @@ -44,15 +43,17 @@ struct async_nif_req_entry { void *args; void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *); void (*fn_post)(void *); + STAILQ_ENTRY(async_nif_req_entry) entries; }; -DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry); + struct async_nif_work_queue { - STAT_DEF(qwait); - unsigned int workers; + unsigned int num_workers; ErlNifMutex *reqs_mutex; ErlNifCond *reqs_cnd; - FIFO_QUEUE_TYPE(reqs) reqs; + STAILQ_HEAD(reqs, async_nif_req_entry) reqs; + STAT_DEF(qwait); + STAT_DEF(wt); }; struct async_nif_worker_entry { @@ -64,14 +65,13 @@ struct async_nif_worker_entry { }; struct async_nif_state { - STAT_DEF(qwait); unsigned int shutdown; ErlNifMutex *we_mutex; unsigned int we_active; SLIST_HEAD(joining, async_nif_worker_entry) we_joining; unsigned int num_queues; unsigned int next_q; - FIFO_QUEUE_TYPE(reqs) recycled_reqs; + STAILQ_HEAD(recycled_reqs, async_nif_req_entry) recycled_reqs; unsigned int num_reqs; ErlNifMutex *recycled_req_mutex; struct async_nif_work_queue queues[]; @@ -130,14 +130,14 @@ struct async_nif_state { req->fn_post = (void (*)(void *))fn_post_ ## decl; \ int h = -1; \ if (affinity) \ - h = affinity % async_nif->num_queues; \ + h = ((unsigned int)affinity) % async_nif->num_queues; \ ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \ if (!reply) { \ fn_post_ ## decl (args); \ async_nif_recycle_req(req, async_nif); \ enif_free(copy_of_args); \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ - enif_make_atom(env, "shutdown")); \ + enif_make_atom(env, "eagain")); \ } \ return reply; \ } @@ -188,7 +188,7 @@ async_nif_reuse_req(struct async_nif_state *async_nif) ErlNifEnv *env = NULL; enif_mutex_lock(async_nif->recycled_req_mutex); - if (fifo_q_empty(reqs, async_nif->recycled_reqs)) { + if (STAILQ_EMPTY(&async_nif->recycled_reqs)) { if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) { req = enif_alloc(sizeof(struct async_nif_req_entry)); if (req) { @@ -204,10 +204,10 @@ async_nif_reuse_req(struct async_nif_state *async_nif) } } } else { - req = fifo_q_get(reqs, async_nif->recycled_reqs); + req = STAILQ_FIRST(&async_nif->recycled_reqs); + STAILQ_REMOVE(&async_nif->recycled_reqs, req, async_nif_req_entry, entries); } enif_mutex_unlock(async_nif->recycled_req_mutex); - __stat_tick(async_nif->qwait_stat); return req; } @@ -222,13 +222,12 @@ void async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif) { ErlNifEnv *env = NULL; - __stat_tock(async_nif->qwait_stat); enif_mutex_lock(async_nif->recycled_req_mutex); enif_clear_env(req->env); env = req->env; memset(req, 0, sizeof(struct async_nif_req_entry)); req->env = env; - fifo_q_put(reqs, async_nif->recycled_reqs, req); + STAILQ_INSERT_TAIL(&async_nif->recycled_reqs, req, entries); enif_mutex_unlock(async_nif->recycled_req_mutex); } @@ -247,6 +246,7 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_ enif_mutex_lock(async_nif->we_mutex); +#if 0 // TODO: we = SLIST_FIRST(&async_nif->we_joining); while(we != NULL) { struct async_nif_worker_entry *n = SLIST_NEXT(we, entries); @@ -257,6 +257,7 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_ async_nif->we_active--; we = n; } +#endif if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) { enif_mutex_unlock(async_nif->we_mutex); @@ -287,11 +288,10 @@ static ERL_NIF_TERM async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint) { /* Identify the most appropriate worker for this request. */ - unsigned int qid = 0; + unsigned int i, qid = 0; unsigned int n = async_nif->num_queues; struct async_nif_work_queue *q = NULL; - double await = 0; - double await_inthisq = 0; + double avg_wait_across_q, avg_wt_service_time, avg_wait_this_q = 0; /* Either we're choosing a queue based on some affinity/hinted value or we need to select the next queue in the rotation and atomically update that @@ -304,6 +304,13 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en async_nif->next_q = qid; } + avg_wait_across_q = 0; + for (i = 0; i < async_nif->num_queues; i++) { + avg_wait_across_q += __stat_mean(async_nif->queues[i].qwait_stat); + } + if (avg_wait_across_q != 0) + avg_wait_across_q /= async_nif->num_queues; + /* Now we inspect and interate across the set of queues trying to select one that isn't too full or too slow. */ do { @@ -313,36 +320,36 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en /* Now that we hold the lock, check for shutdown. As long as we hold this lock either a) we're shutting down so exit now or b) this queue will be valid until we release the lock. */ + if (async_nif->shutdown) { enif_mutex_unlock(q->reqs_mutex); return 0; } - if (!fifo_q_full(reqs, q->reqs)) { - await = __stat_mean_log2(async_nif->qwait_stat); - await_inthisq = __stat_mean_log2(q->qwait_stat); - //DPRINTF("q:%d %f/%f", qid, await_inthisq, await); - if (await_inthisq > await) { - enif_mutex_unlock(q->reqs_mutex); - qid = (qid + 1) % async_nif->num_queues; - q = &async_nif->queues[qid]; - } else { - // q->reqs_mutex unlocked at end of function - break; - } + + avg_wait_this_q = __stat_mean(q->qwait_stat); + avg_wt_service_time = __stat_mean(q->wt_stat); + DPRINTF("q:%d w:%u %f/%f(%f) %c", qid, q->num_workers, avg_wait_this_q, avg_wait_across_q, avg_wt_service_time, avg_wait_this_q <= avg_wait_across_q ? 't' : 'f'); + if (avg_wait_this_q <= avg_wait_across_q) break; + else { + enif_mutex_unlock(q->reqs_mutex); + qid = (qid + 1) % async_nif->num_queues; + q = &async_nif->queues[qid]; } } while(n-- > 0); + if (n == 0) return 0; // All queues are full, trigger eagain + /* We hold the queue's lock, and we've seletect a reasonable queue for this new request now check to make sure there are enough workers actively processing requests on this queue. */ - if (q->workers < 2 || await_inthisq > await) { + if (q->num_workers == 0 || avg_wait_this_q >= avg_wt_service_time) { if (async_nif_start_worker(async_nif, q) == 0) - q->workers++; + q->num_workers++; } /* And finally add the request to the queue. */ __stat_tick(q->qwait_stat); - fifo_q_put(reqs, q->reqs, req); + STAILQ_INSERT_TAIL(&q->reqs, req, entries); /* Build the term before releasing the lock so as not to race on the use of the req pointer (which will soon become invalid in another thread @@ -376,10 +383,9 @@ async_nif_worker_fn(void *arg) enif_mutex_unlock(q->reqs_mutex); break; } - if (fifo_q_empty(reqs, q->reqs)) { + if (STAILQ_EMPTY(&q->reqs)) { /* Queue is empty so we wait for more work to arrive. */ - __stat_reset(q->qwait_stat); - if (q->workers > 2) { + if (q->num_workers > 2) { enif_mutex_unlock(q->reqs_mutex); break; } else { @@ -387,21 +393,23 @@ async_nif_worker_fn(void *arg) goto check_again_for_work; } } else { - assert(fifo_q_size(reqs, q->reqs) > 0); - assert(fifo_q_size(reqs, q->reqs) < fifo_q_capacity(reqs, q->reqs)); /* At this point the next req is ours to process and we hold the reqs_mutex lock. Take the request off the queue. */ - req = fifo_q_get(reqs, q->reqs); + req = STAILQ_FIRST(&q->reqs); + STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries); /* Ensure that there is at least one other worker thread watching this queue. */ enif_cond_signal(q->reqs_cnd); enif_mutex_unlock(q->reqs_mutex); - /* Perform the work. */ - req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); __stat_tock(q->qwait_stat); + /* Perform the work. */ + __stat_tick(q->wt_stat); + req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); + __stat_tock(q->wt_stat); + /* Now call the post-work cleanup function. */ req->fn_post(req->args); @@ -418,7 +426,11 @@ async_nif_worker_fn(void *arg) enif_mutex_lock(async_nif->we_mutex); SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries); enif_mutex_unlock(async_nif->we_mutex); - q->workers--; + q->num_workers--; + if (q->num_workers == 0) { + __stat_reset(q->qwait_stat); + __stat_reset(q->wt_stat); + } enif_thread_exit(0); return 0; } @@ -433,8 +445,6 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) struct async_nif_worker_entry *we = NULL; UNUSED(env); - __stat_print_histogram(async_nif->qwait_stat, "wterl"); - /* Signal the worker threads, stop what you're doing and exit. To ensure that we don't race with the enqueue() process we first lock all the worker queues, then set shutdown to true, then @@ -452,8 +462,10 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) for (i = 0; i < num_queues; i++) { q = &async_nif->queues[i]; + __stat_print_histogram(async_nif->queues[i].qwait_stat, "wterl q-wait"); enif_mutex_unlock(q->reqs_mutex); } + __stat_print_histogram(async_nif->queues[i].qwait_stat, "wterl service time"); /* Join for the now exiting worker threads. */ while(async_nif->we_active > 0) { @@ -480,7 +492,9 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) /* Worker threads are stopped, now toss anything left in the queue. */ req = NULL; - fifo_q_foreach(reqs, q->reqs, req, { + req = STAILQ_FIRST(&q->reqs); + while(req != NULL) { + struct async_nif_req_entry *n = STAILQ_NEXT(req, entries); enif_clear_env(req->env); enif_send(NULL, &req->pid, req->env, enif_make_tuple2(req->env, enif_make_atom(req->env, "error"), @@ -489,8 +503,8 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) enif_free_env(req->env); enif_free(req->args); enif_free(req); - }); - fifo_q_free(reqs, q->reqs); + req = n; + } enif_mutex_destroy(q->reqs_mutex); enif_cond_destroy(q->reqs_cnd); } @@ -498,11 +512,13 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) /* Free any req structures sitting unused on the recycle queue. */ enif_mutex_lock(async_nif->recycled_req_mutex); req = NULL; - fifo_q_foreach(reqs, async_nif->recycled_reqs, req, { + req = STAILQ_FIRST(&async_nif->recycled_reqs); + while(req != NULL) { + struct async_nif_req_entry *n = STAILQ_NEXT(req, entries); enif_free_env(req->env); enif_free(req); - }); - fifo_q_free(reqs, async_nif->recycled_reqs); + req = n; + } enif_mutex_unlock(async_nif->recycled_req_mutex); enif_mutex_destroy(async_nif->recycled_req_mutex); @@ -550,18 +566,18 @@ async_nif_load() async_nif->we_active = 0; async_nif->next_q = 0; async_nif->shutdown = 0; - async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS); + STAILQ_INIT(&async_nif->recycled_reqs); async_nif->recycled_req_mutex = enif_mutex_create(NULL); - async_nif->qwait_stat = __stat_init(1000); async_nif->we_mutex = enif_mutex_create(NULL); SLIST_INIT(&async_nif->we_joining); for (i = 0; i < async_nif->num_queues; i++) { struct async_nif_work_queue *q = &async_nif->queues[i]; - q->reqs = fifo_q_new(reqs, ASYNC_NIF_WORKER_QUEUE_SIZE); + STAILQ_INIT(&q->reqs); q->reqs_mutex = enif_mutex_create(NULL); q->reqs_cnd = enif_cond_create(NULL); - q->qwait_stat = __stat_init(1000); + q->qwait_stat = __stat_init(100000); + q->wt_stat = __stat_init(100000); } return async_nif; } diff --git a/c_src/cas.h b/c_src/cas.h index 61c1f61..2f35cb4 100644 --- a/c_src/cas.h +++ b/c_src/cas.h @@ -55,9 +55,15 @@ #define CACHE_LINE_SIZE 64 +#define ATOMIC_INCR(_v,_newval) \ +do { \ + __typeof(_v) __val = (_v); \ + while ( (_newval = CASIO(&(_v),__val,__val+1)) != __val ) \ + __val = _newval; \ +} while ( 0 ) #define ATOMIC_ADD_TO(_v,_x) \ do { \ - int __val = (_v), __newval; \ + __typeof(_v) __val = (_v), __newval; \ while ( (__newval = CASIO(&(_v),__val,__val+(_x))) != __val ) \ __val = __newval; \ } while ( 0 ) diff --git a/c_src/fifo_q.h b/c_src/fifo_q.h deleted file mode 100644 index bbc4ff0..0000000 --- a/c_src/fifo_q.h +++ /dev/null @@ -1,95 +0,0 @@ -/* - * fifo_q: a macro-based implementation of a FIFO Queue - * - * Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. - * Author: Gregory Burd - * - * This file is provided to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain - * a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef __FIFO_Q_H__ -#define __FIFO_Q_H__ - -#if defined(__cplusplus) -extern "C" { -#endif - -#define fifo_t(name) \ - struct fifo_q__ ## name * -#define FIFO_QUEUE_TYPE(name) \ - struct fifo_q__ ## name * -#define DECL_FIFO_QUEUE(name, type) \ - struct fifo_q__ ## name { \ - unsigned int h, t, s; \ - type *items[]; \ - }; \ - static struct fifo_q__ ## name *fifo_q_ ## name ## _new(unsigned int n) { \ - int sz = sizeof(struct fifo_q__ ## name) + ((n+1) * sizeof(type *));\ - struct fifo_q__ ## name *q = enif_alloc(sz); \ - if (!q) \ - return 0; \ - memset(q, 0, sz); \ - q->s = n + 1; \ - return q; \ - } \ - static inline void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \ - memset(q, 0, sizeof(struct fifo_q__ ## name) + (q->s * sizeof(type *))); \ - enif_free(q); \ - } \ - static inline type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \ - q->items[q->h] = n; \ - q->h = (q->h + 1) % q->s; \ - return n; \ - } \ - static inline type *fifo_q_ ## name ## _get(struct fifo_q__ ## name *q) { \ - type *n = q->items[q->t]; \ - q->items[q->t] = 0; \ - q->t = (q->t + 1) % q->s; \ - return n; \ - } \ - static inline unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \ - return (q->h - q->t + q->s) % q->s; \ - } \ - static inline unsigned int fifo_q_ ## name ## _capacity(struct fifo_q__ ## name *q) { \ - return q->s - 1; \ - } \ - static inline int fifo_q_ ## name ## _empty(struct fifo_q__ ## name *q) { \ - return (q->t == q->h); \ - } \ - static inline int fifo_q_ ## name ## _full(struct fifo_q__ ## name *q) { \ - return ((q->h + 1) % q->s) == q->t; \ - } - -#define fifo_q_new(name, size) fifo_q_ ## name ## _new(size) -#define fifo_q_free(name, queue) fifo_q_ ## name ## _free(queue) -#define fifo_q_get(name, queue) fifo_q_ ## name ## _get(queue) -#define fifo_q_put(name, queue, item) fifo_q_ ## name ## _put(queue, item) -#define fifo_q_size(name, queue) fifo_q_ ## name ## _size(queue) -#define fifo_q_capacity(name, queue) fifo_q_ ## name ## _capacity(queue) -#define fifo_q_empty(name, queue) fifo_q_ ## name ## _empty(queue) -#define fifo_q_full(name, queue) fifo_q_ ## name ## _full(queue) -#define fifo_q_foreach(name, queue, item, task) do { \ - while(!fifo_q_ ## name ## _empty(queue)) { \ - item = fifo_q_ ## name ## _get(queue); \ - do task while(0); \ - } \ - } while(0); - - -#if defined(__cplusplus) -} -#endif - -#endif // __FIFO_Q_H__ diff --git a/c_src/stats.c b/c_src/stats.c index 7b9c28f..9e6938c 100644 --- a/c_src/stats.c +++ b/c_src/stats.c @@ -75,7 +75,7 @@ __stat_mean(struct stat *s) h = (h + 1) % s->num_samples; } if (mean > 0) - mean /= (double)(s->n < s->num_samples ? s->n : s->num_samples); + mean /= (s->n < s->num_samples ? (double)s->n : (double)s->num_samples); return mean; } @@ -83,16 +83,16 @@ double __stat_mean_log2(struct stat *s) { uint32_t i; - double mean; + double mean = 0.0; if (!s) return 0.0; - mean = 0; for (i = 0; i < 64; i++) mean += (s->histogram[i] * i); if (mean > 0) - mean /= (double)s->n; + mean /= (s->n < s->num_samples ? s->n : s->num_samples); + DPRINTF("n: %u %llu %f", s->n < 64 ? s->n : 64, PRIuint64(s), mean); return mean; } diff --git a/c_src/wterl.c b/c_src/wterl.c index 95c4cf4..7526ef3 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -1427,7 +1427,7 @@ ASYNC_NIF_DECL( } args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); enif_keep_resource((void*)args->conn_handle); - affinity = __str_hash(0, args->uri, __strlen(args->uri)); + //affinity = __str_hash(0, args->uri, __strlen(args->uri)); }, { // work @@ -1486,7 +1486,7 @@ ASYNC_NIF_DECL( } args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); enif_keep_resource((void*)args->conn_handle); - affinity = __str_hash(0, args->uri, __strlen(args->uri)); + //affinity = __str_hash(0, args->uri, __strlen(args->uri)); }, { // work @@ -1566,7 +1566,7 @@ ASYNC_NIF_DECL( args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); args->value = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[3]); enif_keep_resource((void*)args->conn_handle); - affinity = __str_hash(0, args->uri, __strlen(args->uri)); + //affinity = __str_hash(0, args->uri, __strlen(args->uri)); }, { // work