Compare commits

...

9 commits

Author SHA1 Message Date
Gregory Burd
08acdbefc8 Retry three times, then bail out and return not found. 2013-09-04 13:11:02 -04:00
Gregory Burd
033661875e Trace line numbers within macros. 2013-09-04 13:10:16 -04:00
Gregory Burd
9496f99019 WIP: race condition on close causes double-free 2013-09-04 13:09:17 -04:00
Gregory Burd
f4a1126fde Fix a few mistakes from the merge. 2013-08-26 13:07:46 -04:00
Gregory Burd
da418b4abf Merge remote-tracking branch 'origin/master' into gsb-rcu
Conflicts:
	c_src/async_nif.h
	c_src/build_deps.sh
	src/async_nif.hrl
2013-08-21 15:56:31 -04:00
Gregory Burd
a984a6dae0 Adding a small patch to URCU build process. 2013-08-13 08:54:22 -04:00
Gregory Burd
45037cbcca Selective receive requires that the ref is created in the scope of the receive. 2013-07-30 12:53:33 -04:00
Gregory Burd
e9f9d13e47 WIP: replaced all locked queues in async_nif with lock-free RCU queues, compiles but SEGVs. 2013-07-28 20:16:52 -04:00
Gregory Burd
866b2a2ed1 Add the User-Space RCU library. 2013-07-27 08:21:40 -04:00
6 changed files with 305 additions and 232 deletions

View file

@ -24,9 +24,11 @@
extern "C" {
#endif
#include <assert.h>
#include "queue.h"
#include <stdlib.h>
#include <urcu.h> /* RCU flavor */
#include <urcu/arch.h> /* RCU Architecture */
#include <urcu/tls-compat.h> /* RCU Thread-local storage */
#include <urcu/rculfqueue.h> /* RCU Lock-free queue */
#ifndef UNUSED
#define UNUSED(v) ((void)(v))
@ -37,6 +39,9 @@ extern "C" {
#define ASYNC_NIF_WORKER_QUEUE_SIZE 8192
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
static DEFINE_URCU_TLS(unsigned long long, nr_enqueues);
static DEFINE_URCU_TLS(unsigned long long, nr_dequeues);
/* Atoms (initialized in on_load) */
static ERL_NIF_TERM ATOM_EAGAIN;
static ERL_NIF_TERM ATOM_ENOMEM;
@ -45,7 +50,6 @@ static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_SHUTDOWN;
struct async_nif_req_entry {
ERL_NIF_TERM ref;
ErlNifEnv *env;
@ -53,17 +57,15 @@ struct async_nif_req_entry {
void *args;
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
void (*fn_post)(void *);
STAILQ_ENTRY(async_nif_req_entry) entries;
struct cds_lfq_node_rcu queue_entry;
struct rcu_head rcu_head;
};
struct async_nif_work_queue {
unsigned int num_workers;
unsigned int depth;
ErlNifMutex *reqs_mutex;
ErlNifCond *reqs_cnd;
struct cds_lfq_queue_rcu req_queue;
struct async_nif_work_queue *next;
STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
};
struct async_nif_worker_entry {
@ -71,40 +73,39 @@ struct async_nif_worker_entry {
unsigned int worker_id;
struct async_nif_state *async_nif;
struct async_nif_work_queue *q;
SLIST_ENTRY(async_nif_worker_entry) entries;
struct cds_lfq_node_rcu queue_entry;
struct rcu_head rcu_head;
};
struct async_nif_state {
unsigned int shutdown;
ErlNifMutex *we_mutex;
unsigned int we_active;
SLIST_HEAD(joining, async_nif_worker_entry) we_joining;
unsigned int num_active_workers;
struct cds_lfq_queue_rcu worker_join_queue;
unsigned int num_queues;
unsigned int next_q;
STAILQ_HEAD(recycled_reqs, async_nif_req_entry) recycled_reqs;
struct cds_lfq_queue_rcu recycled_req_queue;
unsigned int num_reqs;
ErlNifMutex *recycled_req_mutex;
struct async_nif_work_queue queues[];
};
#define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \
struct decl ## _args frame; \
static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) { \
struct decl##_args frame; \
static void fn_work_##decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl##_args *args) { \
UNUSED(worker_id); \
DPRINTF("async_nif: calling \"%s\"", __func__); \
do work_block while(0); \
DPRINTF("async_nif: returned from \"%s\"", __func__); \
} \
static void fn_post_ ## decl (struct decl ## _args *args) { \
static void fn_post_##decl (struct decl##_args *args) { \
UNUSED(args); \
DPRINTF("async_nif: calling \"fn_post_%s\"", #decl); \
do post_block while(0); \
DPRINTF("async_nif: returned from \"fn_post_%s\"", #decl); \
} \
static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \
struct decl ## _args on_stack_args; \
struct decl ## _args *args = &on_stack_args; \
struct decl ## _args *copy_of_args; \
struct decl##_args on_stack_args; \
struct decl##_args *args = &on_stack_args; \
struct decl##_args *copy_of_args; \
struct async_nif_req_entry *req = NULL; \
unsigned int affinity = 0; \
ErlNifEnv *new_env = NULL; \
@ -124,22 +125,22 @@ struct async_nif_state {
DPRINTF("async_nif: returned from \"%s\"", __func__); \
copy_of_args = (struct decl ## _args *)malloc(sizeof(struct decl ## _args)); \
if (!copy_of_args) { \
fn_post_ ## decl (args); \
fn_post_##decl (args); \
async_nif_recycle_req(req, async_nif); \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \
} \
memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \
memcpy(copy_of_args, args, sizeof(struct decl##_args)); \
req->ref = enif_make_copy(new_env, argv_in[0]); \
enif_self(env, &req->pid); \
req->args = (void*)copy_of_args; \
req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \
req->fn_post = (void (*)(void *))fn_post_ ## decl; \
req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_##decl ; \
req->fn_post = (void (*)(void *))fn_post_##decl; \
int h = -1; \
if (affinity) \
h = ((unsigned int)affinity) % async_nif->num_queues; \
ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \
if (!reply) { \
fn_post_ ## decl (args); \
fn_post_##decl (args); \
async_nif_recycle_req(req, async_nif); \
free(copy_of_args); \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_EAGAIN); \
@ -189,30 +190,38 @@ struct async_nif_state {
struct async_nif_req_entry *
async_nif_reuse_req(struct async_nif_state *async_nif)
{
struct cds_lfq_node_rcu *node;
struct async_nif_req_entry *req = NULL;
ErlNifEnv *env = NULL;
enif_mutex_lock(async_nif->recycled_req_mutex);
if (STAILQ_EMPTY(&async_nif->recycled_reqs)) {
if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) {
/* Look for a request on our Lock-Free/RCU Queue first. */
rcu_read_lock();
node = cds_lfq_dequeue_rcu(&async_nif->recycled_req_queue);
rcu_read_unlock();
if (node) {
/* The goal is to reuse these req structs, not malloc/free them
repeatedly so we don't `call_rcu(&async_nif->rcu, free_req_cb);`.
We reuse this req, then when exiting we'll free all of them at
once. */
req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
} else {
/* The reuse queue is empty, create a new request. */
if (uatomic_read(&async_nif->num_reqs) < ASYNC_NIF_MAX_QUEUED_REQS) {
req = malloc(sizeof(struct async_nif_req_entry));
if (req) {
memset(req, 0, sizeof(struct async_nif_req_entry));
env = enif_alloc_env();
if (env) {
req->env = env;
__sync_fetch_and_add(&async_nif->num_reqs, 1);
uatomic_inc(&async_nif->num_reqs);
} else {
free(req);
req = NULL;
}
}
}
} else {
req = STAILQ_FIRST(&async_nif->recycled_reqs);
STAILQ_REMOVE(&async_nif->recycled_reqs, req, async_nif_req_entry, entries);
}
enif_mutex_unlock(async_nif->recycled_req_mutex);
return req;
}
@ -226,14 +235,21 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
void
async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif)
{
ErlNifEnv *env = NULL;
enif_mutex_lock(async_nif->recycled_req_mutex);
/* Three things to do here to prepare this request struct for reuse.
1) clear the NIF Environment
2) zero out the req struct except...
3) keep a pointer to the env so we can reset it in the req */
ErlNifEnv *env = req->env;
enif_clear_env(req->env);
env = req->env;
if (req->args) free(req->args);
memset(req, 0, sizeof(struct async_nif_req_entry));
req->env = env;
STAILQ_INSERT_TAIL(&async_nif->recycled_reqs, req, entries);
enif_mutex_unlock(async_nif->recycled_req_mutex);
/* Now enqueue this request on our Lock-Free/RCU Queue to be reused later. */
cds_lfq_node_init_rcu(&req->queue_entry);
rcu_read_lock();
cds_lfq_enqueue_rcu(&async_nif->recycled_req_queue, &req->queue_entry);
rcu_read_unlock();
}
static void *async_nif_worker_fn(void *);
@ -244,41 +260,38 @@ static void *async_nif_worker_fn(void *);
static int
async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_queue *q)
{
struct async_nif_worker_entry *we;
struct async_nif_worker_entry *worker;
if (0 == q)
return EINVAL;
enif_mutex_lock(async_nif->we_mutex);
/* Before creating a new worker thread join threads which have exited. */
for(;;) {
struct cds_lfq_node_rcu *node;
we = SLIST_FIRST(&async_nif->we_joining);
while(we != NULL) {
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
rcu_read_lock();
node = cds_lfq_dequeue_rcu(&async_nif->worker_join_queue);
rcu_read_unlock();
if (!node) break; /* Queue is empty. */
worker = caa_container_of(node, struct async_nif_worker_entry, queue_entry);
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(we->tid, &exit_value);
free(we);
async_nif->we_active--;
we = n;
enif_thread_join(worker->tid, &exit_value);
free(worker);
uatomic_dec(&async_nif->num_active_workers);
}
if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) {
enif_mutex_unlock(async_nif->we_mutex);
if (uatomic_read(&async_nif->num_active_workers) >= ASYNC_NIF_MAX_WORKERS)
return EAGAIN;
}
we = malloc(sizeof(struct async_nif_worker_entry));
if (!we) {
enif_mutex_unlock(async_nif->we_mutex);
return ENOMEM;
}
memset(we, 0, sizeof(struct async_nif_worker_entry));
we->worker_id = async_nif->we_active++;
we->async_nif = async_nif;
we->q = q;
enif_mutex_unlock(async_nif->we_mutex);
return enif_thread_create(NULL,&we->tid, &async_nif_worker_fn, (void*)we, 0);
worker = malloc(sizeof(struct async_nif_worker_entry));
if (!worker) return ENOMEM;
memset(worker, 0, sizeof(struct async_nif_worker_entry));
worker->worker_id = uatomic_add_return(&async_nif->num_active_workers, 1);
worker->async_nif = async_nif;
worker->q = q;
return enif_thread_create(NULL,&worker->tid, &async_nif_worker_fn, (void*)worker, 0);
}
/**
@ -295,6 +308,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
struct async_nif_work_queue *q = NULL;
double avg_depth = 0.0;
//rcu_register_thread();
/* Either we're choosing a queue based on some affinity/hinted value or we
need to select the next queue in the rotation and atomically update that
global value (next_q is shared across worker threads) . */
@ -312,59 +327,58 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
for (i = 0; i < async_nif->num_queues; i++) {
/* Compute the average queue depth not counting queues which are empty or
the queue we're considering right now. */
unsigned int j, n = 0;
unsigned int j, d, n = 0;
for (j = 0; j < async_nif->num_queues; j++) {
if (j != qid && async_nif->queues[j].depth != 0) {
d = uatomic_read(&async_nif->queues[j].depth);
if (j != qid && d != 0) {
n++;
avg_depth += async_nif->queues[j].depth;
avg_depth += d;
}
}
if (avg_depth) avg_depth /= n;
/* Lock this queue under consideration, then check for shutdown. While
we hold this lock either a) we're shutting down so exit now or b) this
queue will be valid until we release the lock. */
q = &async_nif->queues[qid];
enif_mutex_lock(q->reqs_mutex);
/* Try not to enqueue a request into a queue that isn't keeping up with
the request volume. */
if (q->depth <= avg_depth) break;
else {
enif_mutex_unlock(q->reqs_mutex);
qid = (qid + 1) % async_nif->num_queues;
}
if (uatomic_read(&q->depth) <= avg_depth) break;
else qid = (qid + 1) % async_nif->num_queues;
}
/* If the for loop finished then we didn't find a suitable queue for this
request, meaning we're backed up so trigger eagain. Note that if we left
the loop in this way we hold no lock. */
if (i == async_nif->num_queues) return 0;
request (e.g. we're backed up servicing requests) or the shutdown flag was
set. Returning '0' will toss this request and free its resources.*/
if (i == async_nif->num_queues || uatomic_read(&async_nif->shutdown))
return 0;
/* Add the request to the queue. */
STAILQ_INSERT_TAIL(&q->reqs, req, entries);
__sync_fetch_and_add(&q->depth, 1);
cds_lfq_node_init_rcu(&req->queue_entry);
rcu_read_lock();
cds_lfq_enqueue_rcu(&q->req_queue, &req->queue_entry);
rcu_read_unlock();
URCU_TLS(nr_enqueues)++;
uatomic_inc(&q->depth);
uatomic_inc(&q->num_workers);
/* We've selected a queue for this new request now check to make sure there are
enough workers actively processing requests on this queue. */
while (q->depth > q->num_workers) {
while (uatomic_read(&q->depth) > uatomic_read(&q->num_workers)) {
switch(async_nif_start_worker(async_nif, q)) {
case EINVAL: case ENOMEM: default: return 0;
case EAGAIN: continue;
case 0: __sync_fetch_and_add(&q->num_workers, 1); goto done;
case EINVAL:
case ENOMEM:
return 0;
default:
case EAGAIN:
continue;
case 0:
uatomic_inc(&q->num_workers);
goto done;
}
}done:;
/* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid in another thread
performing the request). */
double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE;
ERL_NIF_TERM reply = enif_make_tuple2(req->env, ATOM_OK,
return enif_make_tuple2(req->env, ATOM_OK,
enif_make_tuple2(req->env, ATOM_ENQUEUED,
enif_make_double(req->env, pct_full)));
enif_cond_signal(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex);
return reply;
}
/**
@ -375,73 +389,46 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
static void *
async_nif_worker_fn(void *arg)
{
struct async_nif_worker_entry *we = (struct async_nif_worker_entry *)arg;
unsigned int worker_id = we->worker_id;
struct async_nif_state *async_nif = we->async_nif;
struct async_nif_work_queue *q = we->q;
struct async_nif_worker_entry *worker = (struct async_nif_worker_entry *)arg;
unsigned int worker_id = worker->worker_id;
struct async_nif_state *async_nif = worker->async_nif;
struct async_nif_work_queue *l = NULL, *q = worker->q;
// TODO(gburd): set_affinity(); to the CPU_ID for this queue
rcu_register_thread();
while(q != l) {
struct cds_lfq_node_rcu *node;
struct async_nif_req_entry *req = NULL;
unsigned int tries = async_nif->num_queues;
for(;;) {
/* Examine the request queue, are there things to be done? */
enif_mutex_lock(q->reqs_mutex);
check_again_for_work:
if (async_nif->shutdown) {
enif_mutex_unlock(q->reqs_mutex);
if (uatomic_read(&async_nif->shutdown))
break;
}
if (STAILQ_EMPTY(&q->reqs)) {
/* Queue is empty so we wait for more work to arrive. */
enif_mutex_unlock(q->reqs_mutex);
if (tries == 0 && q == we->q) {
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
/* At this point we've tried to find/execute work on all queues
* and there are at least MIN_WORKERS on this queue so we
* leaving this loop (break) which leads to a thread exit/join. */
break;
} else {
enif_mutex_lock(q->reqs_mutex);
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work;
}
} else {
tries--;
__sync_fetch_and_add(&q->num_workers, -1);
q = q->next;
__sync_fetch_and_add(&q->num_workers, 1);
continue; // try next queue
}
} else {
/* At this point the next req is ours to process and we hold the
reqs_mutex lock. Take the request off the queue. */
req = STAILQ_FIRST(&q->reqs);
STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries);
__sync_fetch_and_add(&q->depth, -1);
/* Wake up other worker thread watching this queue to help process work. */
enif_cond_signal(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex);
rcu_read_lock();
node = cds_lfq_dequeue_rcu(&q->req_queue);
rcu_read_unlock();
/* Perform the work. */
if (node) {
req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
uatomic_dec(&q->depth);
URCU_TLS(nr_dequeues)++;
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
/* Now call the post-work cleanup function. */
req->fn_post(req->args);
/* Clean up req for reuse. */
req->ref = 0;
req->fn_work = 0;
req->fn_post = 0;
free(req->args);
req->args = NULL;
async_nif_recycle_req(req, async_nif);
req = NULL;
l = q;
} else {
/* This queue is empty, cycle through other queues looking for work. */
uatomic_dec(&q->num_workers);
q = q->next;
uatomic_inc(&q->num_workers);
}
}
enif_mutex_lock(async_nif->we_mutex);
SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries);
enif_mutex_unlock(async_nif->we_mutex);
__sync_fetch_and_add(&q->num_workers, -1);
uatomic_dec(&q->num_workers);
cds_lfq_node_init_rcu(&worker->queue_entry);
rcu_read_lock();
cds_lfq_enqueue_rcu(&async_nif->worker_join_queue, &worker->queue_entry);
rcu_read_unlock();
rcu_unregister_thread();
enif_thread_exit(0);
return 0;
}
@ -451,83 +438,74 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
{
unsigned int i;
unsigned int num_queues = async_nif->num_queues;
struct cds_lfq_node_rcu *node;
struct async_nif_work_queue *q = NULL;
struct async_nif_req_entry *req = NULL;
struct async_nif_worker_entry *we = NULL;
UNUSED(env);
/* Signal the worker threads, stop what you're doing and exit. To ensure
that we don't race with the enqueue() process we first lock all the worker
queues, then set shutdown to true, then unlock. The enqueue function will
take the queue mutex, then test for shutdown condition, then enqueue only
if not shutting down. */
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
enif_mutex_lock(q->reqs_mutex);
}
/* Set the shutdown flag so that worker threads will no continue
executing requests. */
async_nif->shutdown = 1;
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
enif_mutex_unlock(q->reqs_mutex);
}
/* Signal the worker threads, stop what you're doing and exit. */
uatomic_set(&async_nif->shutdown, 1);
/* Join for the now exiting worker threads. */
while(async_nif->we_active > 0) {
for (i = 0; i < num_queues; i++)
enif_cond_broadcast(async_nif->queues[i].reqs_cnd);
enif_mutex_lock(async_nif->we_mutex);
we = SLIST_FIRST(&async_nif->we_joining);
while(we != NULL) {
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
while(uatomic_read(&async_nif->num_active_workers) > 0) {
struct async_nif_worker_entry *worker;
struct cds_lfq_node_rcu *node;
rcu_read_lock();
node = cds_lfq_dequeue_rcu(&async_nif->worker_join_queue);
rcu_read_unlock();
if (node) {
worker = caa_container_of(node, struct async_nif_worker_entry, queue_entry);
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(we->tid, &exit_value);
free(we);
async_nif->we_active--;
we = n;
enif_thread_join(worker->tid, &exit_value);
free(worker);
uatomic_dec(&async_nif->num_active_workers);
}
enif_mutex_unlock(async_nif->we_mutex);
}
enif_mutex_destroy(async_nif->we_mutex);
cds_lfq_destroy_rcu(&async_nif->worker_join_queue); // TODO(gburd): check return val
/* Cleanup in-flight requests, mutexes and conditions in each work queue. */
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
/* Worker threads are stopped, now toss anything left in the queue. */
req = NULL;
req = STAILQ_FIRST(&q->reqs);
while(req != NULL) {
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
for (;;) {
rcu_read_lock();
node = cds_lfq_dequeue_rcu(&q->req_queue);
rcu_read_unlock();
if (!node) break; /* Queue is empty. */
struct async_nif_req_entry *req;
req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
enif_clear_env(req->env);
enif_send(NULL, &req->pid, req->env,
enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN));
enif_send(NULL, &req->pid, req->env, enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN));
req->fn_post(req->args);
enif_free_env(req->env);
free(req->args);
enif_free_env(req->env);
free(req);
req = n;
}
enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd);
cds_lfq_destroy_rcu(&q->req_queue); // TODO(gburd): check return val
}
/* Free any req structures sitting unused on the recycle queue. */
enif_mutex_lock(async_nif->recycled_req_mutex);
req = NULL;
req = STAILQ_FIRST(&async_nif->recycled_reqs);
while(req != NULL) {
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
enif_free_env(req->env);
free(req);
req = n;
}
for (;;) {
rcu_read_lock();
node = cds_lfq_dequeue_rcu(&async_nif->recycled_req_queue);
rcu_read_unlock();
if (!node) break; /* Queue is empty. */
struct async_nif_req_entry *req;
req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
enif_free_env(req->env);
free(req->args);
free(req);
}
cds_lfq_destroy_rcu(&async_nif->recycled_req_queue); // TODO(gburd): check return val
enif_mutex_unlock(async_nif->recycled_req_mutex);
enif_mutex_destroy(async_nif->recycled_req_mutex);
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
free_all_cpu_call_rcu_data();
free(async_nif);
}
@ -551,6 +529,10 @@ async_nif_load(ErlNifEnv *env)
ATOM_OK = enif_make_atom(env, "ok");
ATOM_SHUTDOWN = enif_make_atom(env, "shutdown");
/* Init the RCU library. */
rcu_init();
(void)create_all_cpu_call_rcu_data(0);
/* Find out how many schedulers there are. */
enif_system_info(&info, sizeof(ErlNifSysInfo));
@ -566,6 +548,7 @@ async_nif_load(ErlNifEnv *env)
if (num_queues < 2)
num_queues = 2;
}
num_queues = 1; //TODO remove me.
/* Init our portion of priv_data's module-specific state. */
async_nif = malloc(sizeof(struct async_nif_state) +
@ -576,19 +559,15 @@ async_nif_load(ErlNifEnv *env)
sizeof(struct async_nif_work_queue) * num_queues);
async_nif->num_queues = num_queues;
async_nif->we_active = 0;
async_nif->num_active_workers = 0;
async_nif->next_q = 0;
async_nif->shutdown = 0;
STAILQ_INIT(&async_nif->recycled_reqs);
async_nif->recycled_req_mutex = enif_mutex_create("recycled_req");
async_nif->we_mutex = enif_mutex_create("we");
SLIST_INIT(&async_nif->we_joining);
cds_lfq_init_rcu(&async_nif->recycled_req_queue, call_rcu);
cds_lfq_init_rcu(&async_nif->worker_join_queue, call_rcu);
for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i];
STAILQ_INIT(&q->reqs);
q->reqs_mutex = enif_mutex_create("reqs");
q->reqs_cnd = enif_cond_create("reqs");
cds_lfq_init_rcu(&q->req_queue, call_rcu);
q->next = &async_nif->queues[(i + 1) % num_queues];
}
return async_nif;

View file

@ -1,5 +1,8 @@
#!/bin/bash
#t=__.$$
#trap 'rm -f $t; exit 0' 0 1 2 3 13 15
# /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is.
if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then
POSIX_SHELL="true"
@ -10,15 +13,22 @@ unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as w
set -e
# WiredTiger
WT_REPO=http://github.com/wiredtiger/wiredtiger.git
#WT_BRANCH=develop
#WT_DIR=wiredtiger-`basename $WT_BRANCH`
WT_REF="tags/1.6.4"
WT_DIR=wiredtiger-`basename $WT_REF`
# Google's Snappy Compression
SNAPPY_VSN="1.0.4"
SNAPPY_DIR=snappy-$SNAPPY_VSN
# User-space Read-Copy-Update (RCU)
URCU_REPO=git://git.lttng.org/userspace-rcu.git
URCU_REF="tags/v0.7.7"
URCU_DIR=urcu-`basename $URCU_REF`
[ `basename $PWD` != "c_src" ] && cd c_src
export BASEDIR="$PWD"
@ -31,6 +41,36 @@ export CXXFLAGS="$CXXFLAGS -I $BASEDIR/system/include"
export LDFLAGS="$LDFLAGS -L$BASEDIR/system/lib"
export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$BASEDIR/system/lib:$LD_LIBRARY_PATH"
get_urcu ()
{
if [ -d $BASEDIR/$URCU_DIR/.git ]; then
(cd $BASEDIR/$URCU_DIR && git pull -u) || exit 1
else
if [ "X$URCU_REF" != "X" ]; then
git clone ${URCU_REPO} ${URCU_DIR} && \
(cd $BASEDIR/$URCU_DIR && git checkout refs/$URCU_REF || exit 1)
else
git clone ${URCU_REPO} ${URCU_DIR} && \
(cd $BASEDIR/$URCU_DIR && git checkout -b $URCU_BRANCH origin/$URCU_BRANCH || exit 1)
fi
fi
[ -d $BASEDIR/$URCU_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
(cd $BASEDIR/$URCU_DIR
[ -e $BASEDIR/urcu-build.patch ] && \
(patch -p1 --forward < $BASEDIR/urcu-build.patch || exit 1 )
autoreconf -fis || exit 1
urcu_configure;
)
}
urcu_configure ()
{
(cd $BASEDIR/$URCU_DIR
LDFLAGS+="-Wl,-rpath,lib/urcu-v0.7.7/priv:lib/urcu/priv:priv" \
CFLAGS+="-m64 -Os -g -march=native -mtune=native -fPIC" \
./configure --prefix=${BASEDIR}/system || exit 1)
}
get_wt ()
{
if [ -d $BASEDIR/$WT_DIR/.git ]; then
@ -60,6 +100,7 @@ wt_configure ()
(cd $BASEDIR/$WT_DIR/build_posix
CFLAGS+=-g ../configure --with-pic \
--enable-snappy \
--disable-python --disable-java \
--prefix=${BASEDIR}/system || exit 1)
}
@ -76,8 +117,9 @@ get_snappy ()
get_deps ()
{
get_snappy;
get_wt;
get_snappy;
get_urcu;
}
update_deps ()
@ -91,6 +133,22 @@ update_deps ()
fi
)
fi
if [ -d $BASEDIR/$URCU_DIR/.git ]; then
(cd $BASEDIR/$URCU_DIR
if [ "X$URCU_VSN" == "X" ]; then
git pull -u || exit 1
else
git checkout $URCU_VSN || exit 1
fi
)
fi
}
build_urcu ()
{
urcu_configure;
(cd $BASEDIR/$URCU_DIR && $MAKE -j && $MAKE install)
}
build_wt ()
@ -111,6 +169,7 @@ case "$1" in
clean)
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE clean)
[ -e $BASEDIR/$URCU_DIR/Makefile ] && (cd $BASEDIR/$URCU_DIR && $MAKE clean)
rm -rf system $SNAPPY_DIR
rm -f ${BASEDIR}/../priv/wt
rm -f ${BASEDIR}/../priv/libwiredtiger-*.so
@ -136,6 +195,11 @@ case "$1" in
[ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory" && exit 1)
test -f $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9] || build_snappy;
# Build URCU
[ -d $URCU_DIR ] || get_urcu;
[ -d $BASEDIR/$URCU_DIR ] || (echo "Missing URCU source directory" && exit 1)
test -f $BASEDIR/system/lib/liburcu.a || build_urcu;
# Build WiredTiger
[ -d $WT_DIR ] || get_wt;
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
@ -147,5 +211,9 @@ case "$1" in
cp -p -P $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/libwiredtiger_snappy.so* ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/libsnappy.so* ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/liburcu.so* ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/liburcu-*.so* ${BASEDIR}/../priv
;;
esac
exit 0

View file

@ -42,8 +42,8 @@ extern "C" {
#define DPUTS(arg) ((void) 0)
#endif
#ifndef __UNUSED
#define __UNUSED(v) ((void)(v))
#ifndef UNUSED
#define UNUSED(v) ((void)(v))
#endif
#ifndef COMPQUIET

23
c_src/urcu-build.patch Normal file
View file

@ -0,0 +1,23 @@
diff --git a/urcu/rculfhash.h b/urcu/rculfhash.h
index 23bd1ce..98e00b4 100644
--- a/urcu/rculfhash.h
+++ b/urcu/rculfhash.h
@@ -31,6 +31,10 @@
#include <urcu-call-rcu.h>
#include <urcu-flavor.h>
+#ifndef UNUSED
+#define UNUSED(v) ((void)(v))
+#endif
+
#ifdef __cplusplus
extern "C" {
#endif
@@ -92,6 +96,7 @@ typedef int (*cds_lfht_match_fct)(struct cds_lfht_node *node, const void *key);
static inline
void cds_lfht_node_init(struct cds_lfht_node *node)
{
+ UNUSED(node);
}
/*

View file

@ -38,8 +38,8 @@
{port_specs, [{"priv/wterl.so", ["c_src/*.c"]}]}.
{port_env, [
{"DRV_CFLAGS", "$DRV_CFLAGS -fPIC -Wall -Wextra -Werror -I c_src/system/include"},
{"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:priv -Lc_src/system/lib -lwiredtiger"}
{"DRV_CFLAGS", "$DRV_CFLAGS -fPIC -Wall -Wextra -Werror -fdebug-cpp -ftrack-macro-expansion=2 -I c_src/system/include"},
{"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:priv -Lc_src/system/lib -lurcu -lurcu-cds -lwiredtiger"}
]}.
{pre_hooks, [{compile, "c_src/build_deps.sh compile"}]}.

View file

@ -22,7 +22,7 @@
%% -------------------------------------------------------------------
-define(ASYNC_NIF_CALL(Fun, Args),
F = fun(F) ->
F = fun(F, T) ->
R = erlang:make_ref(),
case erlang:apply(Fun, [R|Args]) of
{ok, {enqueued, PctBusy}} ->
@ -43,9 +43,12 @@
Reply
end;
{error, eagain} ->
F(F);
case T of
3 -> not_found;
_ -> F(F, T + 1)
end;
Other ->
Other
end
end,
F(F)).
F(F, 1)).