From e9f9d13e47578ea72d74855e795a77b9493a7341 Mon Sep 17 00:00:00 2001 From: Gregory Burd Date: Sun, 28 Jul 2013 20:16:52 -0400 Subject: [PATCH] WIP: replaced all locked queues in async_nif with lock-free RCU queues, compiles but SEGVs. --- c_src/async_nif.h | 397 ++++++++++++++++------------------- c_src/build_deps.sh | 9 +- c_src/common.h | 4 +- c_src/wiredtiger-build.patch | 14 +- rebar.config | 2 +- 5 files changed, 199 insertions(+), 227 deletions(-) diff --git a/c_src/async_nif.h b/c_src/async_nif.h index 89c758a..074991a 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -25,8 +25,11 @@ extern "C" { #endif #include - -#include "queue.h" +#include +#include +#include +#include +#include #ifndef UNUSED #define UNUSED(v) ((void)(v)) @@ -37,6 +40,9 @@ extern "C" { #define ASYNC_NIF_WORKER_QUEUE_SIZE 100 #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS +static DEFINE_URCU_TLS(unsigned long long, nr_enqueues); +static DEFINE_URCU_TLS(unsigned long long, nr_dequeues); + struct async_nif_req_entry { ERL_NIF_TERM ref; ErlNifEnv *env; @@ -44,17 +50,14 @@ struct async_nif_req_entry { void *args; void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *); void (*fn_post)(void *); - STAILQ_ENTRY(async_nif_req_entry) entries; + struct cds_lfq_node_rcu queue_entry; }; - struct async_nif_work_queue { unsigned int num_workers; unsigned int depth; - ErlNifMutex *reqs_mutex; - ErlNifCond *reqs_cnd; + struct cds_lfq_queue_rcu req_queue; struct async_nif_work_queue *next; - STAILQ_HEAD(reqs, async_nif_req_entry) reqs; }; struct async_nif_worker_entry { @@ -62,40 +65,39 @@ struct async_nif_worker_entry { unsigned int worker_id; struct async_nif_state *async_nif; struct async_nif_work_queue *q; - SLIST_ENTRY(async_nif_worker_entry) entries; + struct cds_lfq_node_rcu queue_entry; }; struct async_nif_state { unsigned int shutdown; - ErlNifMutex *we_mutex; - unsigned int we_active; - SLIST_HEAD(joining, async_nif_worker_entry) we_joining; + unsigned int num_active_workers; + struct cds_lfq_queue_rcu worker_join_queue; unsigned int num_queues; unsigned int next_q; - STAILQ_HEAD(recycled_reqs, async_nif_req_entry) recycled_reqs; + struct cds_lfq_queue_rcu recycled_req_queue; unsigned int num_reqs; - ErlNifMutex *recycled_req_mutex; + struct rcu_head rcu; struct async_nif_work_queue queues[]; }; #define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \ - struct decl ## _args frame; \ - static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) { \ + struct decl##_args frame; \ + static void fn_work_##decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl##_args *args) { \ UNUSED(worker_id); \ DPRINTF("async_nif: calling \"%s\"", __func__); \ do work_block while(0); \ DPRINTF("async_nif: returned from \"%s\"", __func__); \ } \ - static void fn_post_ ## decl (struct decl ## _args *args) { \ + static void fn_post_##decl (struct decl##_args *args) { \ UNUSED(args); \ DPRINTF("async_nif: calling \"fn_post_%s\"", #decl); \ do post_block while(0); \ DPRINTF("async_nif: returned from \"fn_post_%s\"", #decl); \ } \ static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \ - struct decl ## _args on_stack_args; \ - struct decl ## _args *args = &on_stack_args; \ - struct decl ## _args *copy_of_args; \ + struct decl##_args on_stack_args; \ + struct decl##_args *args = &on_stack_args; \ + struct decl##_args *copy_of_args; \ struct async_nif_req_entry *req = NULL; \ unsigned int affinity = 0; \ ErlNifEnv *new_env = NULL; \ @@ -104,7 +106,7 @@ struct async_nif_state { argc -= 1; \ /* Note: !!! this assumes that the first element of priv_data is ours */ \ struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \ - if (async_nif->shutdown) { \ + if (uatomic_read(&async_nif->shutdown)) { \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "shutdown")); \ } \ @@ -117,25 +119,25 @@ struct async_nif_state { DPRINTF("async_nif: calling \"%s\"", __func__); \ do pre_block while(0); \ DPRINTF("async_nif: returned from \"%s\"", __func__); \ - copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \ + copy_of_args = (struct decl##_args *)enif_alloc(sizeof(struct decl##_args)); \ if (!copy_of_args) { \ - fn_post_ ## decl (args); \ + fn_post_##decl (args); \ async_nif_recycle_req(req, async_nif); \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "enomem")); \ } \ - memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \ + memcpy(copy_of_args, args, sizeof(struct decl##_args)); \ req->ref = enif_make_copy(new_env, argv_in[0]); \ enif_self(env, &req->pid); \ req->args = (void*)copy_of_args; \ - req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \ - req->fn_post = (void (*)(void *))fn_post_ ## decl; \ + req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_##decl ; \ + req->fn_post = (void (*)(void *))fn_post_##decl; \ int h = -1; \ if (affinity) \ h = ((unsigned int)affinity) % async_nif->num_queues; \ ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \ if (!reply) { \ - fn_post_ ## decl (args); \ + fn_post_##decl (args); \ async_nif_recycle_req(req, async_nif); \ enif_free(copy_of_args); \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ @@ -186,30 +188,39 @@ struct async_nif_state { struct async_nif_req_entry * async_nif_reuse_req(struct async_nif_state *async_nif) { + struct cds_lfq_node_rcu *node; struct async_nif_req_entry *req = NULL; ErlNifEnv *env = NULL; - enif_mutex_lock(async_nif->recycled_req_mutex); - if (STAILQ_EMPTY(&async_nif->recycled_reqs)) { - if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) { + /* Look for a request on our Lock-Free/RCU Queue first. */ + rcu_read_lock(); + node = cds_lfq_dequeue_rcu(&async_nif->recycled_req_queue); + req = caa_container_of(node, struct async_nif_req_entry, queue_entry); + rcu_read_unlock(); + + if (req) { + /* The goal is to reuse these req structs, not malloc/free them + repeatedly so we don't `call_rcu(&async_nif->rcu, free_req_cb);`. + We reuse this req, then when exiting we'll free all of them at + once. */ + return req; + } else { + if (uatomic_read(&async_nif->num_reqs) < ASYNC_NIF_MAX_QUEUED_REQS) { req = enif_alloc(sizeof(struct async_nif_req_entry)); if (req) { memset(req, 0, sizeof(struct async_nif_req_entry)); env = enif_alloc_env(); if (env) { req->env = env; - __sync_fetch_and_add(&async_nif->num_reqs, 1); + uatomic_inc(&async_nif->num_reqs); } else { enif_free(req); req = NULL; } } + return req; } - } else { - req = STAILQ_FIRST(&async_nif->recycled_reqs); - STAILQ_REMOVE(&async_nif->recycled_reqs, req, async_nif_req_entry, entries); } - enif_mutex_unlock(async_nif->recycled_req_mutex); return req; } @@ -223,14 +234,21 @@ async_nif_reuse_req(struct async_nif_state *async_nif) void async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif) { - ErlNifEnv *env = NULL; - enif_mutex_lock(async_nif->recycled_req_mutex); + /* Three things to do here to prepare this request struct for reuse. + 1) clear the NIF Environment + 2) zero out the req struct except... + 3) keep a pointer to the env so we can reset it in the req */ + ErlNifEnv *env = req->env; enif_clear_env(req->env); - env = req->env; + if (req->args) enif_free(req->args); memset(req, 0, sizeof(struct async_nif_req_entry)); req->env = env; - STAILQ_INSERT_TAIL(&async_nif->recycled_reqs, req, entries); - enif_mutex_unlock(async_nif->recycled_req_mutex); + + /* Now enqueue this request on our Lock-Free/RCU Queue to be reused later. */ + cds_lfq_node_init_rcu(&req->queue_entry); + rcu_read_lock(); + cds_lfq_enqueue_rcu(&async_nif->recycled_req_queue, &req->queue_entry); + rcu_read_unlock(); } static void *async_nif_worker_fn(void *); @@ -241,41 +259,38 @@ static void *async_nif_worker_fn(void *); static int async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_queue *q) { - struct async_nif_worker_entry *we; + struct async_nif_worker_entry *worker; if (0 == q) return EINVAL; - enif_mutex_lock(async_nif->we_mutex); + /* Before creating a new worker thread join threads which have exited. */ + for(;;) { + struct cds_lfq_node_rcu *node; + rcu_read_lock(); + node = cds_lfq_dequeue_rcu(&async_nif->worker_join_queue); + worker = caa_container_of(node, struct async_nif_worker_entry, queue_entry); + rcu_read_unlock(); - we = SLIST_FIRST(&async_nif->we_joining); - while(we != NULL) { - struct async_nif_worker_entry *n = SLIST_NEXT(we, entries); - SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries); - void *exit_value = 0; /* We ignore the thread_join's exit value. */ - enif_thread_join(we->tid, &exit_value); - enif_free(we); - async_nif->we_active--; - we = n; + if (worker) { + void *exit_value = 0; /* We ignore the thread_join's exit value. */ + enif_thread_join(worker->tid, &exit_value); + enif_free(worker); + uatomic_dec(&async_nif->num_active_workers); + } else + break; } - if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) { - enif_mutex_unlock(async_nif->we_mutex); + if (uatomic_read(&async_nif->num_active_workers) >= ASYNC_NIF_MAX_WORKERS) return EAGAIN; - } - we = enif_alloc(sizeof(struct async_nif_worker_entry)); - if (!we) { - enif_mutex_unlock(async_nif->we_mutex); - return ENOMEM; - } - memset(we, 0, sizeof(struct async_nif_worker_entry)); - we->worker_id = async_nif->we_active++; - we->async_nif = async_nif; - we->q = q; - - enif_mutex_unlock(async_nif->we_mutex); - return enif_thread_create(NULL,&we->tid, &async_nif_worker_fn, (void*)we, 0); + worker = enif_alloc(sizeof(struct async_nif_worker_entry)); + if (!worker) return ENOMEM; + memset(worker, 0, sizeof(struct async_nif_worker_entry)); + worker->worker_id = uatomic_add_return(&async_nif->num_active_workers, 1); + worker->async_nif = async_nif; + worker->q = q; + return enif_thread_create(NULL,&worker->tid, &async_nif_worker_fn, (void*)worker, 0); } /** @@ -299,8 +314,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en qid = (unsigned int)hint; } else { do { - last_qid = __sync_fetch_and_add(&async_nif->next_q, 0); - qid = (last_qid + 1) % async_nif->num_queues; + last_qid = __sync_fetch_and_add(&async_nif->next_q, 0); + qid = (last_qid + 1) % async_nif->num_queues; } while (!__sync_bool_compare_and_swap(&async_nif->next_q, last_qid, qid)); } @@ -309,60 +324,53 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en for (i = 0; i < async_nif->num_queues; i++) { /* Compute the average queue depth not counting queues which are empty or the queue we're considering right now. */ - unsigned int j, n = 0; + unsigned int j, d, n = 0; for (j = 0; j < async_nif->num_queues; j++) { - if (j != qid && async_nif->queues[j].depth != 0) { + d = uatomic_read(&async_nif->queues[j].depth); + if (j != qid && d != 0) { n++; - avg_depth += async_nif->queues[j].depth; + avg_depth += d; } } if (avg_depth) avg_depth /= n; - /* Lock this queue under consideration, then check for shutdown. While - we hold this lock either a) we're shutting down so exit now or b) this - queue will be valid until we release the lock. */ q = &async_nif->queues[qid]; - enif_mutex_lock(q->reqs_mutex); - if (async_nif->shutdown) { - enif_mutex_unlock(q->reqs_mutex); + if (uatomic_read(&async_nif->shutdown)) return 0; - } /* Try not to enqueue a request into a queue that isn't keeping up with the request volume. */ - if (q->depth <= avg_depth) break; - else { - enif_mutex_unlock(q->reqs_mutex); - qid = (qid + 1) % async_nif->num_queues; - } + if (uatomic_read(&q->depth) <= avg_depth) break; + else qid = (qid + 1) % async_nif->num_queues; } /* If the for loop finished then we didn't find a suitable queue for this - request, meaning we're backed up so trigger eagain. Note that if we left - the loop in this way we hold no lock. */ + request, meaning we're backed up so trigger eagain. */ if (i == async_nif->num_queues) return 0; /* Add the request to the queue. */ - STAILQ_INSERT_TAIL(&q->reqs, req, entries); - __sync_fetch_and_add(&q->depth, 1); + cds_lfq_node_init_rcu(&req->queue_entry); + rcu_read_lock(); + cds_lfq_enqueue_rcu(&q->req_queue, &req->queue_entry); + rcu_read_unlock(); + URCU_TLS(nr_enqueues)++; + uatomic_inc(&q->depth); /* We've selected a queue for this new request now check to make sure there are enough workers actively processing requests on this queue. */ - while (q->depth > q->num_workers) { + while (uatomic_read(&q->depth) > uatomic_read(&q->num_workers)) { switch(async_nif_start_worker(async_nif, q)) { case EINVAL: case ENOMEM: default: return 0; case EAGAIN: continue; - case 0: __sync_fetch_and_add(&q->num_workers, 1); goto done; + case 0: uatomic_inc(&q->num_workers); goto done; } - }done:; + } done:; /* Build the term before releasing the lock so as not to race on the use of the req pointer (which will soon become invalid in another thread performing the request). */ ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"), enif_make_atom(req->env, "enqueued")); - enif_cond_signal(q->reqs_cnd); - enif_mutex_unlock(q->reqs_mutex); return reply; } @@ -374,69 +382,46 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en static void * async_nif_worker_fn(void *arg) { - struct async_nif_worker_entry *we = (struct async_nif_worker_entry *)arg; - unsigned int worker_id = we->worker_id; - struct async_nif_state *async_nif = we->async_nif; - struct async_nif_work_queue *q = we->q; - struct async_nif_req_entry *req = NULL; - unsigned int tries = async_nif->num_queues; + struct async_nif_worker_entry *worker = (struct async_nif_worker_entry *)arg; + unsigned int worker_id = worker->worker_id; + struct async_nif_state *async_nif = worker->async_nif; + struct async_nif_work_queue *l = NULL, *q = worker->q; - for(;;) { - /* Examine the request queue, are there things to be done? */ - enif_mutex_lock(q->reqs_mutex); - check_again_for_work: - if (async_nif->shutdown) { - enif_mutex_unlock(q->reqs_mutex); + // TODO(gburd): set_affinity(); to the CPU_ID for this queue + rcu_register_thread(); + + while(q != l) { + struct cds_lfq_node_rcu *node; + struct async_nif_req_entry *req = NULL; + + if (uatomic_read(&async_nif->shutdown)) break; - } - if (STAILQ_EMPTY(&q->reqs)) { - /* Queue is empty so we wait for more work to arrive. */ - if (q->num_workers > ASYNC_NIF_MIN_WORKERS) { - enif_mutex_unlock(q->reqs_mutex); - if (tries == 0 && q == we->q) break; // we've tried all queues, thread exit - else { - tries--; - __sync_fetch_and_add(&q->num_workers, -1); - q = q->next; - __sync_fetch_and_add(&q->num_workers, 1); - continue; // try another queue - } - } else { - enif_cond_wait(q->reqs_cnd, q->reqs_mutex); - goto check_again_for_work; - } + + rcu_read_lock(); + node = cds_lfq_dequeue_rcu(&q->req_queue); + req = caa_container_of(node, struct async_nif_req_entry, queue_entry); + rcu_read_unlock(); + + if (req) { + uatomic_dec(&q->depth); + URCU_TLS(nr_dequeues)++; + req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); + req->fn_post(req->args); + async_nif_recycle_req(req, async_nif); + l = q; } else { - /* At this point the next req is ours to process and we hold the - reqs_mutex lock. Take the request off the queue. */ - req = STAILQ_FIRST(&q->reqs); - STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries); - __sync_fetch_and_add(&q->depth, -1); - - /* Ensure that there is at least one other worker thread watching this - queue. */ - enif_cond_signal(q->reqs_cnd); - enif_mutex_unlock(q->reqs_mutex); - - /* Perform the work. */ - req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); - - /* Now call the post-work cleanup function. */ - req->fn_post(req->args); - - /* Clean up req for reuse. */ - req->ref = 0; - req->fn_work = 0; - req->fn_post = 0; - enif_free(req->args); - req->args = NULL; - async_nif_recycle_req(req, async_nif); - req = NULL; + /* This queue is empty, cycle through other queues looking for work. */ + uatomic_dec(&q->num_workers); + q = q->next; + uatomic_inc(&q->num_workers); } } - enif_mutex_lock(async_nif->we_mutex); - SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries); - enif_mutex_unlock(async_nif->we_mutex); - __sync_fetch_and_add(&q->num_workers, -1); + uatomic_dec(&q->num_workers); + cds_lfq_node_init_rcu(&worker->queue_entry); + rcu_read_lock(); + cds_lfq_enqueue_rcu(&async_nif->worker_join_queue, &worker->queue_entry); + rcu_read_unlock(); + rcu_unregister_thread(); enif_thread_exit(0); return 0; } @@ -446,84 +431,68 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) { unsigned int i; unsigned int num_queues = async_nif->num_queues; + struct cds_lfq_node_rcu *node; struct async_nif_work_queue *q = NULL; - struct async_nif_req_entry *req = NULL; - struct async_nif_worker_entry *we = NULL; UNUSED(env); - /* Signal the worker threads, stop what you're doing and exit. To ensure - that we don't race with the enqueue() process we first lock all the worker - queues, then set shutdown to true, then unlock. The enqueue function will - take the queue mutex, then test for shutdown condition, then enqueue only - if not shutting down. */ - for (i = 0; i < num_queues; i++) { - q = &async_nif->queues[i]; - enif_mutex_lock(q->reqs_mutex); - } - /* Set the shutdown flag so that worker threads will no continue - executing requests. */ - async_nif->shutdown = 1; - for (i = 0; i < num_queues; i++) { - q = &async_nif->queues[i]; - enif_mutex_unlock(q->reqs_mutex); - } + /* Signal the worker threads, stop what you're doing and exit. */ + uatomic_set(&async_nif->shutdown, 1); /* Join for the now exiting worker threads. */ - while(async_nif->we_active > 0) { - for (i = 0; i < num_queues; i++) - enif_cond_broadcast(async_nif->queues[i].reqs_cnd); - enif_mutex_lock(async_nif->we_mutex); - we = SLIST_FIRST(&async_nif->we_joining); - while(we != NULL) { - struct async_nif_worker_entry *n = SLIST_NEXT(we, entries); - SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries); + while(uatomic_read(&async_nif->num_active_workers) > 0) { + struct async_nif_worker_entry *worker; + struct cds_lfq_node_rcu *node; + rcu_read_lock(); + node = cds_lfq_dequeue_rcu(&async_nif->worker_join_queue); + worker = caa_container_of(node, struct async_nif_worker_entry, queue_entry); + rcu_read_unlock(); + + if (worker) { void *exit_value = 0; /* We ignore the thread_join's exit value. */ - enif_thread_join(we->tid, &exit_value); - enif_free(we); - async_nif->we_active--; - we = n; + enif_thread_join(worker->tid, &exit_value); + enif_free(worker); + uatomic_dec(&async_nif->num_active_workers); } - enif_mutex_unlock(async_nif->we_mutex); } - enif_mutex_destroy(async_nif->we_mutex); + cds_lfq_destroy_rcu(&async_nif->worker_join_queue); // TODO(gburd): check return val /* Cleanup in-flight requests, mutexes and conditions in each work queue. */ for (i = 0; i < num_queues; i++) { q = &async_nif->queues[i]; /* Worker threads are stopped, now toss anything left in the queue. */ - req = NULL; - req = STAILQ_FIRST(&q->reqs); - while(req != NULL) { - struct async_nif_req_entry *n = STAILQ_NEXT(req, entries); - enif_clear_env(req->env); - enif_send(NULL, &req->pid, req->env, + do { + node = cds_lfq_dequeue_rcu(&q->req_queue); + if (node) { + struct async_nif_req_entry *req; + req = caa_container_of(node, struct async_nif_req_entry, queue_entry); + enif_clear_env(req->env); + enif_send(NULL, &req->pid, req->env, enif_make_tuple2(req->env, enif_make_atom(req->env, "error"), enif_make_atom(req->env, "shutdown"))); - req->fn_post(req->args); - enif_free_env(req->env); - enif_free(req->args); - enif_free(req); - req = n; - } - enif_mutex_destroy(q->reqs_mutex); - enif_cond_destroy(q->reqs_cnd); + req->fn_post(req->args); + enif_free(req->args); + enif_free_env(req->env); + enif_free(req); + } + } while(node); + cds_lfq_destroy_rcu(&q->req_queue); // TODO(gburd): check return val } /* Free any req structures sitting unused on the recycle queue. */ - enif_mutex_lock(async_nif->recycled_req_mutex); - req = NULL; - req = STAILQ_FIRST(&async_nif->recycled_reqs); - while(req != NULL) { - struct async_nif_req_entry *n = STAILQ_NEXT(req, entries); - enif_free_env(req->env); - enif_free(req); - req = n; - } + do { + node = cds_lfq_dequeue_rcu(&async_nif->recycled_req_queue); + if (node) { + struct async_nif_req_entry *req; + req = caa_container_of(node, struct async_nif_req_entry, queue_entry); + enif_free_env(req->env); + enif_free(req); + } + } while(node); + cds_lfq_destroy_rcu(&async_nif->recycled_req_queue); // TODO(gburd): check return val - enif_mutex_unlock(async_nif->recycled_req_mutex); - enif_mutex_destroy(async_nif->recycled_req_mutex); memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues)); + free_all_cpu_call_rcu_data(); enif_free(async_nif); } @@ -539,6 +508,10 @@ async_nif_load() if (has_init) return 0; else has_init = 1; + /* Init the RCU library. */ + rcu_init(); + (void)create_all_cpu_call_rcu_data(0); + /* Find out how many schedulers there are. */ enif_system_info(&info, sizeof(ErlNifSysInfo)); @@ -564,19 +537,15 @@ async_nif_load() sizeof(struct async_nif_work_queue) * num_queues); async_nif->num_queues = num_queues; - async_nif->we_active = 0; + async_nif->num_active_workers = 0; async_nif->next_q = 0; async_nif->shutdown = 0; - STAILQ_INIT(&async_nif->recycled_reqs); - async_nif->recycled_req_mutex = enif_mutex_create("recycled_req"); - async_nif->we_mutex = enif_mutex_create("we"); - SLIST_INIT(&async_nif->we_joining); + cds_lfq_init_rcu(&async_nif->recycled_req_queue, call_rcu); + cds_lfq_init_rcu(&async_nif->worker_join_queue, call_rcu); for (i = 0; i < async_nif->num_queues; i++) { struct async_nif_work_queue *q = &async_nif->queues[i]; - STAILQ_INIT(&q->reqs); - q->reqs_mutex = enif_mutex_create("reqs"); - q->reqs_cnd = enif_cond_create("reqs"); + cds_lfq_init_rcu(&q->req_queue, call_rcu); q->next = &async_nif->queues[(i + 1) % num_queues]; } return async_nif; diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index 41babd9..2209ace 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -66,8 +66,9 @@ get_urcu () urcu_configure () { (cd $BASEDIR/$URCU_DIR - CFLAGS+="-m64 -Os -g -march=native -mtune=native" \ - ./configure --disable-shared --prefix=${BASEDIR}/system || exit 1) + LDFLAGS+="-Wl,-rpath,lib/urcu-v0.7.7/priv:lib/urcu/priv:priv" \ + CFLAGS+="-m64 -Os -g -march=native -mtune=native -fPIC" \ + ./configure --prefix=${BASEDIR}/system || exit 1) } get_wt () @@ -194,7 +195,7 @@ case "$1" in # Build URCU [ -d $BASEDIR/$URCU_DIR ] || (echo "Missing URCU source directory" && exit 1) - test -f $BASEDIR/system/lib/liburcu-*.a || build_urcu; + test -f $BASEDIR/system/lib/liburcu.a || build_urcu; # Build Snappy [ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory" && exit 1) @@ -210,6 +211,8 @@ case "$1" in cp -p -P $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so ${BASEDIR}/../priv cp -p -P $BASEDIR/system/lib/libwiredtiger_snappy.so* ${BASEDIR}/../priv cp -p -P $BASEDIR/system/lib/libsnappy.so* ${BASEDIR}/../priv + cp -p -P $BASEDIR/system/lib/liburcu.so* ${BASEDIR}/../priv + cp -p -P $BASEDIR/system/lib/liburcu-*.so* ${BASEDIR}/../priv ;; esac diff --git a/c_src/common.h b/c_src/common.h index df2f162..3cd1de6 100644 --- a/c_src/common.h +++ b/c_src/common.h @@ -42,8 +42,8 @@ extern "C" { #define DPUTS(arg) ((void) 0) #endif -#ifndef __UNUSED -#define __UNUSED(v) ((void)(v)) +#ifndef UNUSED +#define UNUSED(v) ((void)(v)) #endif #ifndef COMPQUIET diff --git a/c_src/wiredtiger-build.patch b/c_src/wiredtiger-build.patch index cb619ff..6df4f85 100644 --- a/c_src/wiredtiger-build.patch +++ b/c_src/wiredtiger-build.patch @@ -3,7 +3,7 @@ index 6d78823..2122cf8 100644 --- a/ext/compressors/snappy/Makefile.am +++ b/ext/compressors/snappy/Makefile.am @@ -2,5 +2,6 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include - + lib_LTLIBRARIES = libwiredtiger_snappy.la libwiredtiger_snappy_la_SOURCES = snappy_compress.c -libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module @@ -15,9 +15,9 @@ index 7e9befe..b924db7 100644 --- a/src/support/cksum.c +++ b/src/support/cksum.c @@ -27,6 +27,13 @@ - + #include "wt_internal.h" - + +#if defined(__amd64) || defined(__x86_64) +#define USE_HARDWARE_CRC32 1 +#else @@ -33,7 +33,7 @@ index 7e9befe..b924db7 100644 #endif }; +#endif /* USE_HARDWARE_CRC32 */ - + /* * __wt_cksum -- @@ -1106,15 +1114,29 @@ __wt_cksum(const void *chunk, size_t len) @@ -55,7 +55,7 @@ index 7e9befe..b924db7 100644 #endif +#endif + } - + /* Checksum in 8B chunks. */ for (nqwords = len / sizeof(uint64_t); nqwords; nqwords--) { +#ifdef USE_HARDWARE_CRC32 @@ -73,7 +73,7 @@ index 7e9befe..b924db7 100644 #endif +#endif } - + /* Checksum trailing bytes one byte at a time. */ + for (len &= 0x7; len > 0; ++p, len--) { +#ifdef USE_HARDWARE_CRC32 @@ -90,7 +90,7 @@ index 7e9befe..b924db7 100644 +#endif +#endif + } - + +#ifdef WORDS_BIGENDIAN /* Do final byte swap to produce a result identical to little endian */ crc = diff --git a/rebar.config b/rebar.config index 52c1c94..0040ee1 100644 --- a/rebar.config +++ b/rebar.config @@ -39,7 +39,7 @@ {port_env, [ {"DRV_CFLAGS", "$DRV_CFLAGS -fPIC -Wall -Wextra -Werror -I c_src/system/include"}, - {"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:priv -Lc_src/system/lib -lwiredtiger"} + {"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:priv -Lc_src/system/lib -lurcu -lurcu-cds -lwiredtiger"} ]}. {pre_hooks, [{compile, "c_src/build_deps.sh compile"}]}.