WIP: race condition on close causes double-free

This commit is contained in:
Gregory Burd 2013-09-04 13:09:17 -04:00
parent f4a1126fde
commit 9496f99019

View file

@ -24,12 +24,11 @@
extern "C" { extern "C" {
#endif #endif
#include <assert.h> #include <stdlib.h>
#include <urcu.h> #include <urcu.h> /* RCU flavor */
#include <urcu/cds.h> #include <urcu/arch.h> /* RCU Architecture */
#include <urcu-defer.h> #include <urcu/tls-compat.h> /* RCU Thread-local storage */
#include <urcu/arch.h> #include <urcu/rculfqueue.h> /* RCU Lock-free queue */
#include <urcu/tls-compat.h>
#ifndef UNUSED #ifndef UNUSED
#define UNUSED(v) ((void)(v)) #define UNUSED(v) ((void)(v))
@ -59,6 +58,7 @@ struct async_nif_req_entry {
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *); void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
void (*fn_post)(void *); void (*fn_post)(void *);
struct cds_lfq_node_rcu queue_entry; struct cds_lfq_node_rcu queue_entry;
struct rcu_head rcu_head;
}; };
struct async_nif_work_queue { struct async_nif_work_queue {
@ -74,6 +74,7 @@ struct async_nif_worker_entry {
struct async_nif_state *async_nif; struct async_nif_state *async_nif;
struct async_nif_work_queue *q; struct async_nif_work_queue *q;
struct cds_lfq_node_rcu queue_entry; struct cds_lfq_node_rcu queue_entry;
struct rcu_head rcu_head;
}; };
struct async_nif_state { struct async_nif_state {
@ -84,7 +85,6 @@ struct async_nif_state {
unsigned int next_q; unsigned int next_q;
struct cds_lfq_queue_rcu recycled_req_queue; struct cds_lfq_queue_rcu recycled_req_queue;
unsigned int num_reqs; unsigned int num_reqs;
struct rcu_head rcu;
struct async_nif_work_queue queues[]; struct async_nif_work_queue queues[];
}; };
@ -139,7 +139,7 @@ struct async_nif_state {
if (affinity) \ if (affinity) \
h = ((unsigned int)affinity) % async_nif->num_queues; \ h = ((unsigned int)affinity) % async_nif->num_queues; \
ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \ ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \
if (!reply) { \ if (!reply) { \
fn_post_##decl (args); \ fn_post_##decl (args); \
async_nif_recycle_req(req, async_nif); \ async_nif_recycle_req(req, async_nif); \
free(copy_of_args); \ free(copy_of_args); \
@ -197,16 +197,16 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
/* Look for a request on our Lock-Free/RCU Queue first. */ /* Look for a request on our Lock-Free/RCU Queue first. */
rcu_read_lock(); rcu_read_lock();
node = cds_lfq_dequeue_rcu(&async_nif->recycled_req_queue); node = cds_lfq_dequeue_rcu(&async_nif->recycled_req_queue);
req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
rcu_read_unlock(); rcu_read_unlock();
if (req) { if (node) {
/* The goal is to reuse these req structs, not malloc/free them /* The goal is to reuse these req structs, not malloc/free them
repeatedly so we don't `call_rcu(&async_nif->rcu, free_req_cb);`. repeatedly so we don't `call_rcu(&async_nif->rcu, free_req_cb);`.
We reuse this req, then when exiting we'll free all of them at We reuse this req, then when exiting we'll free all of them at
once. */ once. */
return req; req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
} else { } else {
/* The reuse queue is empty, create a new request. */
if (uatomic_read(&async_nif->num_reqs) < ASYNC_NIF_MAX_QUEUED_REQS) { if (uatomic_read(&async_nif->num_reqs) < ASYNC_NIF_MAX_QUEUED_REQS) {
req = malloc(sizeof(struct async_nif_req_entry)); req = malloc(sizeof(struct async_nif_req_entry));
if (req) { if (req) {
@ -268,18 +268,18 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_
/* Before creating a new worker thread join threads which have exited. */ /* Before creating a new worker thread join threads which have exited. */
for(;;) { for(;;) {
struct cds_lfq_node_rcu *node; struct cds_lfq_node_rcu *node;
rcu_read_lock(); rcu_read_lock();
node = cds_lfq_dequeue_rcu(&async_nif->worker_join_queue); node = cds_lfq_dequeue_rcu(&async_nif->worker_join_queue);
worker = caa_container_of(node, struct async_nif_worker_entry, queue_entry);
rcu_read_unlock(); rcu_read_unlock();
if (worker) { if (!node) break; /* Queue is empty. */
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(worker->tid, &exit_value); worker = caa_container_of(node, struct async_nif_worker_entry, queue_entry);
free(worker); void *exit_value = 0; /* We ignore the thread_join's exit value. */
uatomic_dec(&async_nif->num_active_workers); enif_thread_join(worker->tid, &exit_value);
} else free(worker);
break; uatomic_dec(&async_nif->num_active_workers);
} }
if (uatomic_read(&async_nif->num_active_workers) >= ASYNC_NIF_MAX_WORKERS) if (uatomic_read(&async_nif->num_active_workers) >= ASYNC_NIF_MAX_WORKERS)
@ -308,6 +308,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
struct async_nif_work_queue *q = NULL; struct async_nif_work_queue *q = NULL;
double avg_depth = 0.0; double avg_depth = 0.0;
//rcu_register_thread();
/* Either we're choosing a queue based on some affinity/hinted value or we /* Either we're choosing a queue based on some affinity/hinted value or we
need to select the next queue in the rotation and atomically update that need to select the next queue in the rotation and atomically update that
global value (next_q is shared across worker threads) . */ global value (next_q is shared across worker threads) . */
@ -334,10 +336,7 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
} }
} }
if (avg_depth) avg_depth /= n; if (avg_depth) avg_depth /= n;
q = &async_nif->queues[qid]; q = &async_nif->queues[qid];
if (uatomic_read(&async_nif->shutdown))
return 0;
/* Try not to enqueue a request into a queue that isn't keeping up with /* Try not to enqueue a request into a queue that isn't keeping up with
the request volume. */ the request volume. */
@ -346,8 +345,10 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
} }
/* If the for loop finished then we didn't find a suitable queue for this /* If the for loop finished then we didn't find a suitable queue for this
request, meaning we're backed up so trigger eagain. */ request (e.g. we're backed up servicing requests) or the shutdown flag was
if (i == async_nif->num_queues) return 0; set. Returning '0' will toss this request and free its resources.*/
if (i == async_nif->num_queues || uatomic_read(&async_nif->shutdown))
return 0;
/* Add the request to the queue. */ /* Add the request to the queue. */
cds_lfq_node_init_rcu(&req->queue_entry); cds_lfq_node_init_rcu(&req->queue_entry);
@ -356,25 +357,28 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
rcu_read_unlock(); rcu_read_unlock();
URCU_TLS(nr_enqueues)++; URCU_TLS(nr_enqueues)++;
uatomic_inc(&q->depth); uatomic_inc(&q->depth);
uatomic_inc(&q->num_workers);
/* We've selected a queue for this new request now check to make sure there are /* We've selected a queue for this new request now check to make sure there are
enough workers actively processing requests on this queue. */ enough workers actively processing requests on this queue. */
while (uatomic_read(&q->depth) > uatomic_read(&q->num_workers)) { while (uatomic_read(&q->depth) > uatomic_read(&q->num_workers)) {
switch(async_nif_start_worker(async_nif, q)) { switch(async_nif_start_worker(async_nif, q)) {
case EINVAL: case ENOMEM: default: return 0; case EINVAL:
case EAGAIN: continue; case ENOMEM:
case 0: uatomic_inc(&q->num_workers); goto done; return 0;
default:
case EAGAIN:
continue;
case 0:
uatomic_inc(&q->num_workers);
goto done;
} }
} done:; }done:;
/* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid in another thread
performing the request). */
double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE; double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE;
ERL_NIF_TERM reply = enif_make_tuple2(req->env, ATOM_OK, return enif_make_tuple2(req->env, ATOM_OK,
enif_make_tuple2(req->env, ATOM_ENQUEUED, enif_make_tuple2(req->env, ATOM_ENQUEUED,
enif_make_double(req->env, pct_full))); enif_make_double(req->env, pct_full)));
return reply;
} }
/** /**
@ -402,10 +406,10 @@ async_nif_worker_fn(void *arg)
rcu_read_lock(); rcu_read_lock();
node = cds_lfq_dequeue_rcu(&q->req_queue); node = cds_lfq_dequeue_rcu(&q->req_queue);
req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
rcu_read_unlock(); rcu_read_unlock();
if (req) { if (node) {
req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
uatomic_dec(&q->depth); uatomic_dec(&q->depth);
URCU_TLS(nr_dequeues)++; URCU_TLS(nr_dequeues)++;
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
@ -445,12 +449,13 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
while(uatomic_read(&async_nif->num_active_workers) > 0) { while(uatomic_read(&async_nif->num_active_workers) > 0) {
struct async_nif_worker_entry *worker; struct async_nif_worker_entry *worker;
struct cds_lfq_node_rcu *node; struct cds_lfq_node_rcu *node;
rcu_read_lock(); rcu_read_lock();
node = cds_lfq_dequeue_rcu(&async_nif->worker_join_queue); node = cds_lfq_dequeue_rcu(&async_nif->worker_join_queue);
worker = caa_container_of(node, struct async_nif_worker_entry, queue_entry);
rcu_read_unlock(); rcu_read_unlock();
if (worker) { if (node) {
worker = caa_container_of(node, struct async_nif_worker_entry, queue_entry);
void *exit_value = 0; /* We ignore the thread_join's exit value. */ void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(worker->tid, &exit_value); enif_thread_join(worker->tid, &exit_value);
free(worker); free(worker);
@ -464,24 +469,33 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
q = &async_nif->queues[i]; q = &async_nif->queues[i];
/* Worker threads are stopped, now toss anything left in the queue. */ /* Worker threads are stopped, now toss anything left in the queue. */
do { for (;;) {
rcu_read_lock();
node = cds_lfq_dequeue_rcu(&q->req_queue); node = cds_lfq_dequeue_rcu(&q->req_queue);
if (node) { rcu_read_unlock();
struct async_nif_req_entry *req;
req = caa_container_of(node, struct async_nif_req_entry, queue_entry); if (!node) break; /* Queue is empty. */
enif_clear_env(req->env);
enif_send(NULL, &req->pid, req->env, enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN)); struct async_nif_req_entry *req;
req->fn_post(req->args); req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
free(req->args); enif_clear_env(req->env);
enif_free_env(req->env); enif_send(NULL, &req->pid, req->env, enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN));
free(req); req->fn_post(req->args);
} free(req->args);
} while(node); enif_free_env(req->env);
free(req);
}
cds_lfq_destroy_rcu(&q->req_queue); // TODO(gburd): check return val cds_lfq_destroy_rcu(&q->req_queue); // TODO(gburd): check return val
} }
/* Free any req structures sitting unused on the recycle queue. */ /* Free any req structures sitting unused on the recycle queue. */
while ((node = cds_lfq_dequeue_rcu(&async_nif->recycled_req_queue)) != NULL) { for (;;) {
rcu_read_lock();
node = cds_lfq_dequeue_rcu(&async_nif->recycled_req_queue);
rcu_read_unlock();
if (!node) break; /* Queue is empty. */
struct async_nif_req_entry *req; struct async_nif_req_entry *req;
req = caa_container_of(node, struct async_nif_req_entry, queue_entry); req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
enif_free_env(req->env); enif_free_env(req->env);
@ -534,6 +548,7 @@ async_nif_load(ErlNifEnv *env)
if (num_queues < 2) if (num_queues < 2)
num_queues = 2; num_queues = 2;
} }
num_queues = 1; //TODO remove me.
/* Init our portion of priv_data's module-specific state. */ /* Init our portion of priv_data's module-specific state. */
async_nif = malloc(sizeof(struct async_nif_state) + async_nif = malloc(sizeof(struct async_nif_state) +