diff --git a/c_src/async_nif.h b/c_src/async_nif.h index 91ab183..4f714c2 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -24,12 +24,11 @@ extern "C" { #endif -#include -#include -#include -#include -#include -#include +#include +#include /* RCU flavor */ +#include /* RCU Architecture */ +#include /* RCU Thread-local storage */ +#include /* RCU Lock-free queue */ #ifndef UNUSED #define UNUSED(v) ((void)(v)) @@ -59,6 +58,7 @@ struct async_nif_req_entry { void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *); void (*fn_post)(void *); struct cds_lfq_node_rcu queue_entry; + struct rcu_head rcu_head; }; struct async_nif_work_queue { @@ -74,6 +74,7 @@ struct async_nif_worker_entry { struct async_nif_state *async_nif; struct async_nif_work_queue *q; struct cds_lfq_node_rcu queue_entry; + struct rcu_head rcu_head; }; struct async_nif_state { @@ -84,7 +85,6 @@ struct async_nif_state { unsigned int next_q; struct cds_lfq_queue_rcu recycled_req_queue; unsigned int num_reqs; - struct rcu_head rcu; struct async_nif_work_queue queues[]; }; @@ -139,7 +139,7 @@ struct async_nif_state { if (affinity) \ h = ((unsigned int)affinity) % async_nif->num_queues; \ ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \ - if (!reply) { \ + if (!reply) { \ fn_post_##decl (args); \ async_nif_recycle_req(req, async_nif); \ free(copy_of_args); \ @@ -197,16 +197,16 @@ async_nif_reuse_req(struct async_nif_state *async_nif) /* Look for a request on our Lock-Free/RCU Queue first. */ rcu_read_lock(); node = cds_lfq_dequeue_rcu(&async_nif->recycled_req_queue); - req = caa_container_of(node, struct async_nif_req_entry, queue_entry); rcu_read_unlock(); - if (req) { + if (node) { /* The goal is to reuse these req structs, not malloc/free them repeatedly so we don't `call_rcu(&async_nif->rcu, free_req_cb);`. We reuse this req, then when exiting we'll free all of them at once. */ - return req; + req = caa_container_of(node, struct async_nif_req_entry, queue_entry); } else { + /* The reuse queue is empty, create a new request. */ if (uatomic_read(&async_nif->num_reqs) < ASYNC_NIF_MAX_QUEUED_REQS) { req = malloc(sizeof(struct async_nif_req_entry)); if (req) { @@ -268,18 +268,18 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_ /* Before creating a new worker thread join threads which have exited. */ for(;;) { struct cds_lfq_node_rcu *node; + rcu_read_lock(); node = cds_lfq_dequeue_rcu(&async_nif->worker_join_queue); - worker = caa_container_of(node, struct async_nif_worker_entry, queue_entry); rcu_read_unlock(); - if (worker) { - void *exit_value = 0; /* We ignore the thread_join's exit value. */ - enif_thread_join(worker->tid, &exit_value); - free(worker); - uatomic_dec(&async_nif->num_active_workers); - } else - break; + if (!node) break; /* Queue is empty. */ + + worker = caa_container_of(node, struct async_nif_worker_entry, queue_entry); + void *exit_value = 0; /* We ignore the thread_join's exit value. */ + enif_thread_join(worker->tid, &exit_value); + free(worker); + uatomic_dec(&async_nif->num_active_workers); } if (uatomic_read(&async_nif->num_active_workers) >= ASYNC_NIF_MAX_WORKERS) @@ -308,6 +308,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en struct async_nif_work_queue *q = NULL; double avg_depth = 0.0; + //rcu_register_thread(); + /* Either we're choosing a queue based on some affinity/hinted value or we need to select the next queue in the rotation and atomically update that global value (next_q is shared across worker threads) . */ @@ -334,10 +336,7 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en } } if (avg_depth) avg_depth /= n; - q = &async_nif->queues[qid]; - if (uatomic_read(&async_nif->shutdown)) - return 0; /* Try not to enqueue a request into a queue that isn't keeping up with the request volume. */ @@ -346,8 +345,10 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en } /* If the for loop finished then we didn't find a suitable queue for this - request, meaning we're backed up so trigger eagain. */ - if (i == async_nif->num_queues) return 0; + request (e.g. we're backed up servicing requests) or the shutdown flag was + set. Returning '0' will toss this request and free its resources.*/ + if (i == async_nif->num_queues || uatomic_read(&async_nif->shutdown)) + return 0; /* Add the request to the queue. */ cds_lfq_node_init_rcu(&req->queue_entry); @@ -356,25 +357,28 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en rcu_read_unlock(); URCU_TLS(nr_enqueues)++; uatomic_inc(&q->depth); + uatomic_inc(&q->num_workers); /* We've selected a queue for this new request now check to make sure there are enough workers actively processing requests on this queue. */ while (uatomic_read(&q->depth) > uatomic_read(&q->num_workers)) { switch(async_nif_start_worker(async_nif, q)) { - case EINVAL: case ENOMEM: default: return 0; - case EAGAIN: continue; - case 0: uatomic_inc(&q->num_workers); goto done; + case EINVAL: + case ENOMEM: + return 0; + default: + case EAGAIN: + continue; + case 0: + uatomic_inc(&q->num_workers); + goto done; } - } done:; + }done:; - /* Build the term before releasing the lock so as not to race on the use of - the req pointer (which will soon become invalid in another thread - performing the request). */ double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE; - ERL_NIF_TERM reply = enif_make_tuple2(req->env, ATOM_OK, - enif_make_tuple2(req->env, ATOM_ENQUEUED, - enif_make_double(req->env, pct_full))); - return reply; + return enif_make_tuple2(req->env, ATOM_OK, + enif_make_tuple2(req->env, ATOM_ENQUEUED, + enif_make_double(req->env, pct_full))); } /** @@ -402,10 +406,10 @@ async_nif_worker_fn(void *arg) rcu_read_lock(); node = cds_lfq_dequeue_rcu(&q->req_queue); - req = caa_container_of(node, struct async_nif_req_entry, queue_entry); rcu_read_unlock(); - if (req) { + if (node) { + req = caa_container_of(node, struct async_nif_req_entry, queue_entry); uatomic_dec(&q->depth); URCU_TLS(nr_dequeues)++; req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); @@ -445,12 +449,13 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) while(uatomic_read(&async_nif->num_active_workers) > 0) { struct async_nif_worker_entry *worker; struct cds_lfq_node_rcu *node; + rcu_read_lock(); node = cds_lfq_dequeue_rcu(&async_nif->worker_join_queue); - worker = caa_container_of(node, struct async_nif_worker_entry, queue_entry); rcu_read_unlock(); - if (worker) { + if (node) { + worker = caa_container_of(node, struct async_nif_worker_entry, queue_entry); void *exit_value = 0; /* We ignore the thread_join's exit value. */ enif_thread_join(worker->tid, &exit_value); free(worker); @@ -464,24 +469,33 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) q = &async_nif->queues[i]; /* Worker threads are stopped, now toss anything left in the queue. */ - do { + for (;;) { + rcu_read_lock(); node = cds_lfq_dequeue_rcu(&q->req_queue); - if (node) { - struct async_nif_req_entry *req; - req = caa_container_of(node, struct async_nif_req_entry, queue_entry); - enif_clear_env(req->env); - enif_send(NULL, &req->pid, req->env, enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN)); - req->fn_post(req->args); - free(req->args); - enif_free_env(req->env); - free(req); - } - } while(node); + rcu_read_unlock(); + + if (!node) break; /* Queue is empty. */ + + struct async_nif_req_entry *req; + req = caa_container_of(node, struct async_nif_req_entry, queue_entry); + enif_clear_env(req->env); + enif_send(NULL, &req->pid, req->env, enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN)); + req->fn_post(req->args); + free(req->args); + enif_free_env(req->env); + free(req); + } cds_lfq_destroy_rcu(&q->req_queue); // TODO(gburd): check return val } /* Free any req structures sitting unused on the recycle queue. */ - while ((node = cds_lfq_dequeue_rcu(&async_nif->recycled_req_queue)) != NULL) { + for (;;) { + rcu_read_lock(); + node = cds_lfq_dequeue_rcu(&async_nif->recycled_req_queue); + rcu_read_unlock(); + + if (!node) break; /* Queue is empty. */ + struct async_nif_req_entry *req; req = caa_container_of(node, struct async_nif_req_entry, queue_entry); enif_free_env(req->env); @@ -534,6 +548,7 @@ async_nif_load(ErlNifEnv *env) if (num_queues < 2) num_queues = 2; } + num_queues = 1; //TODO remove me. /* Init our portion of priv_data's module-specific state. */ async_nif = malloc(sizeof(struct async_nif_state) +