Compare commits

..

9 commits

Author SHA1 Message Date
Gregory Burd
08acdbefc8 Retry three times, then bail out and return not found. 2013-09-04 13:11:02 -04:00
Gregory Burd
033661875e Trace line numbers within macros. 2013-09-04 13:10:16 -04:00
Gregory Burd
9496f99019 WIP: race condition on close causes double-free 2013-09-04 13:09:17 -04:00
Gregory Burd
f4a1126fde Fix a few mistakes from the merge. 2013-08-26 13:07:46 -04:00
Gregory Burd
da418b4abf Merge remote-tracking branch 'origin/master' into gsb-rcu
Conflicts:
	c_src/async_nif.h
	c_src/build_deps.sh
	src/async_nif.hrl
2013-08-21 15:56:31 -04:00
Gregory Burd
a984a6dae0 Adding a small patch to URCU build process. 2013-08-13 08:54:22 -04:00
Gregory Burd
45037cbcca Selective receive requires that the ref is created in the scope of the receive. 2013-07-30 12:53:33 -04:00
Gregory Burd
e9f9d13e47 WIP: replaced all locked queues in async_nif with lock-free RCU queues, compiles but SEGVs. 2013-07-28 20:16:52 -04:00
Gregory Burd
866b2a2ed1 Add the User-Space RCU library. 2013-07-27 08:21:40 -04:00
13 changed files with 475 additions and 345 deletions

4
.gitignore vendored
View file

@ -7,8 +7,6 @@ c_src/*.o
c_src/bzip2-1.0.6 c_src/bzip2-1.0.6
c_src/snappy-1.0.4 c_src/snappy-1.0.4
deps/ deps/
priv/wt priv/
priv/*.so*
priv/*.dylib*
log/ log/
*~ *~

View file

@ -52,17 +52,19 @@ endif
.PHONY: all compile doc clean test dialyzer typer shell distclean pdf \ .PHONY: all compile doc clean test dialyzer typer shell distclean pdf \
update-deps clean-common-test-data rebuild update-deps clean-common-test-data rebuild
all: deps compile all: deps compile test
# ============================================================================= # =============================================================================
# Rules to build the system # Rules to build the system
# ============================================================================= # =============================================================================
deps: deps:
c_src/build_deps.sh get-deps
$(REBAR) get-deps $(REBAR) get-deps
$(REBAR) compile $(REBAR) compile
update-deps: update-deps:
c_src/build_deps.sh update-deps
$(REBAR) update-deps $(REBAR) update-deps
$(REBAR) compile $(REBAR) compile

View file

@ -24,9 +24,11 @@
extern "C" { extern "C" {
#endif #endif
#include <assert.h> #include <stdlib.h>
#include <urcu.h> /* RCU flavor */
#include "queue.h" #include <urcu/arch.h> /* RCU Architecture */
#include <urcu/tls-compat.h> /* RCU Thread-local storage */
#include <urcu/rculfqueue.h> /* RCU Lock-free queue */
#ifndef UNUSED #ifndef UNUSED
#define UNUSED(v) ((void)(v)) #define UNUSED(v) ((void)(v))
@ -37,6 +39,9 @@ extern "C" {
#define ASYNC_NIF_WORKER_QUEUE_SIZE 8192 #define ASYNC_NIF_WORKER_QUEUE_SIZE 8192
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
static DEFINE_URCU_TLS(unsigned long long, nr_enqueues);
static DEFINE_URCU_TLS(unsigned long long, nr_dequeues);
/* Atoms (initialized in on_load) */ /* Atoms (initialized in on_load) */
static ERL_NIF_TERM ATOM_EAGAIN; static ERL_NIF_TERM ATOM_EAGAIN;
static ERL_NIF_TERM ATOM_ENOMEM; static ERL_NIF_TERM ATOM_ENOMEM;
@ -45,7 +50,6 @@ static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_OK; static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_SHUTDOWN; static ERL_NIF_TERM ATOM_SHUTDOWN;
struct async_nif_req_entry { struct async_nif_req_entry {
ERL_NIF_TERM ref; ERL_NIF_TERM ref;
ErlNifEnv *env; ErlNifEnv *env;
@ -53,17 +57,15 @@ struct async_nif_req_entry {
void *args; void *args;
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *); void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
void (*fn_post)(void *); void (*fn_post)(void *);
STAILQ_ENTRY(async_nif_req_entry) entries; struct cds_lfq_node_rcu queue_entry;
struct rcu_head rcu_head;
}; };
struct async_nif_work_queue { struct async_nif_work_queue {
unsigned int num_workers; unsigned int num_workers;
unsigned int depth; unsigned int depth;
ErlNifMutex *reqs_mutex; struct cds_lfq_queue_rcu req_queue;
ErlNifCond *reqs_cnd;
struct async_nif_work_queue *next; struct async_nif_work_queue *next;
STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
}; };
struct async_nif_worker_entry { struct async_nif_worker_entry {
@ -71,40 +73,39 @@ struct async_nif_worker_entry {
unsigned int worker_id; unsigned int worker_id;
struct async_nif_state *async_nif; struct async_nif_state *async_nif;
struct async_nif_work_queue *q; struct async_nif_work_queue *q;
SLIST_ENTRY(async_nif_worker_entry) entries; struct cds_lfq_node_rcu queue_entry;
struct rcu_head rcu_head;
}; };
struct async_nif_state { struct async_nif_state {
unsigned int shutdown; unsigned int shutdown;
ErlNifMutex *we_mutex; unsigned int num_active_workers;
unsigned int we_active; struct cds_lfq_queue_rcu worker_join_queue;
SLIST_HEAD(joining, async_nif_worker_entry) we_joining;
unsigned int num_queues; unsigned int num_queues;
unsigned int next_q; unsigned int next_q;
STAILQ_HEAD(recycled_reqs, async_nif_req_entry) recycled_reqs; struct cds_lfq_queue_rcu recycled_req_queue;
unsigned int num_reqs; unsigned int num_reqs;
ErlNifMutex *recycled_req_mutex;
struct async_nif_work_queue queues[]; struct async_nif_work_queue queues[];
}; };
#define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \ #define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \
struct decl ## _args frame; \ struct decl##_args frame; \
static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) { \ static void fn_work_##decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl##_args *args) { \
UNUSED(worker_id); \ UNUSED(worker_id); \
DPRINTF("async_nif: calling \"%s\"", __func__); \ DPRINTF("async_nif: calling \"%s\"", __func__); \
do work_block while(0); \ do work_block while(0); \
DPRINTF("async_nif: returned from \"%s\"", __func__); \ DPRINTF("async_nif: returned from \"%s\"", __func__); \
} \ } \
static void fn_post_ ## decl (struct decl ## _args *args) { \ static void fn_post_##decl (struct decl##_args *args) { \
UNUSED(args); \ UNUSED(args); \
DPRINTF("async_nif: calling \"fn_post_%s\"", #decl); \ DPRINTF("async_nif: calling \"fn_post_%s\"", #decl); \
do post_block while(0); \ do post_block while(0); \
DPRINTF("async_nif: returned from \"fn_post_%s\"", #decl); \ DPRINTF("async_nif: returned from \"fn_post_%s\"", #decl); \
} \ } \
static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \ static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \
struct decl ## _args on_stack_args; \ struct decl##_args on_stack_args; \
struct decl ## _args *args = &on_stack_args; \ struct decl##_args *args = &on_stack_args; \
struct decl ## _args *copy_of_args; \ struct decl##_args *copy_of_args; \
struct async_nif_req_entry *req = NULL; \ struct async_nif_req_entry *req = NULL; \
unsigned int affinity = 0; \ unsigned int affinity = 0; \
ErlNifEnv *new_env = NULL; \ ErlNifEnv *new_env = NULL; \
@ -124,22 +125,22 @@ struct async_nif_state {
DPRINTF("async_nif: returned from \"%s\"", __func__); \ DPRINTF("async_nif: returned from \"%s\"", __func__); \
copy_of_args = (struct decl ## _args *)malloc(sizeof(struct decl ## _args)); \ copy_of_args = (struct decl ## _args *)malloc(sizeof(struct decl ## _args)); \
if (!copy_of_args) { \ if (!copy_of_args) { \
fn_post_ ## decl (args); \ fn_post_##decl (args); \
async_nif_recycle_req(req, async_nif); \ async_nif_recycle_req(req, async_nif); \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \ return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \
} \ } \
memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \ memcpy(copy_of_args, args, sizeof(struct decl##_args)); \
req->ref = enif_make_copy(new_env, argv_in[0]); \ req->ref = enif_make_copy(new_env, argv_in[0]); \
enif_self(env, &req->pid); \ enif_self(env, &req->pid); \
req->args = (void*)copy_of_args; \ req->args = (void*)copy_of_args; \
req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \ req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_##decl ; \
req->fn_post = (void (*)(void *))fn_post_ ## decl; \ req->fn_post = (void (*)(void *))fn_post_##decl; \
int h = -1; \ int h = -1; \
if (affinity) \ if (affinity) \
h = ((unsigned int)affinity) % async_nif->num_queues; \ h = ((unsigned int)affinity) % async_nif->num_queues; \
ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \ ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \
if (!reply) { \ if (!reply) { \
fn_post_ ## decl (args); \ fn_post_##decl (args); \
async_nif_recycle_req(req, async_nif); \ async_nif_recycle_req(req, async_nif); \
free(copy_of_args); \ free(copy_of_args); \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_EAGAIN); \ return enif_make_tuple2(env, ATOM_ERROR, ATOM_EAGAIN); \
@ -189,30 +190,38 @@ struct async_nif_state {
struct async_nif_req_entry * struct async_nif_req_entry *
async_nif_reuse_req(struct async_nif_state *async_nif) async_nif_reuse_req(struct async_nif_state *async_nif)
{ {
struct cds_lfq_node_rcu *node;
struct async_nif_req_entry *req = NULL; struct async_nif_req_entry *req = NULL;
ErlNifEnv *env = NULL; ErlNifEnv *env = NULL;
enif_mutex_lock(async_nif->recycled_req_mutex); /* Look for a request on our Lock-Free/RCU Queue first. */
if (STAILQ_EMPTY(&async_nif->recycled_reqs)) { rcu_read_lock();
if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) { node = cds_lfq_dequeue_rcu(&async_nif->recycled_req_queue);
rcu_read_unlock();
if (node) {
/* The goal is to reuse these req structs, not malloc/free them
repeatedly so we don't `call_rcu(&async_nif->rcu, free_req_cb);`.
We reuse this req, then when exiting we'll free all of them at
once. */
req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
} else {
/* The reuse queue is empty, create a new request. */
if (uatomic_read(&async_nif->num_reqs) < ASYNC_NIF_MAX_QUEUED_REQS) {
req = malloc(sizeof(struct async_nif_req_entry)); req = malloc(sizeof(struct async_nif_req_entry));
if (req) { if (req) {
memset(req, 0, sizeof(struct async_nif_req_entry)); memset(req, 0, sizeof(struct async_nif_req_entry));
env = enif_alloc_env(); env = enif_alloc_env();
if (env) { if (env) {
req->env = env; req->env = env;
__sync_fetch_and_add(&async_nif->num_reqs, 1); uatomic_inc(&async_nif->num_reqs);
} else { } else {
free(req); free(req);
req = NULL; req = NULL;
} }
} }
} }
} else {
req = STAILQ_FIRST(&async_nif->recycled_reqs);
STAILQ_REMOVE(&async_nif->recycled_reqs, req, async_nif_req_entry, entries);
} }
enif_mutex_unlock(async_nif->recycled_req_mutex);
return req; return req;
} }
@ -226,14 +235,21 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
void void
async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif) async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif)
{ {
ErlNifEnv *env = NULL; /* Three things to do here to prepare this request struct for reuse.
enif_mutex_lock(async_nif->recycled_req_mutex); 1) clear the NIF Environment
2) zero out the req struct except...
3) keep a pointer to the env so we can reset it in the req */
ErlNifEnv *env = req->env;
enif_clear_env(req->env); enif_clear_env(req->env);
env = req->env; if (req->args) free(req->args);
memset(req, 0, sizeof(struct async_nif_req_entry)); memset(req, 0, sizeof(struct async_nif_req_entry));
req->env = env; req->env = env;
STAILQ_INSERT_TAIL(&async_nif->recycled_reqs, req, entries);
enif_mutex_unlock(async_nif->recycled_req_mutex); /* Now enqueue this request on our Lock-Free/RCU Queue to be reused later. */
cds_lfq_node_init_rcu(&req->queue_entry);
rcu_read_lock();
cds_lfq_enqueue_rcu(&async_nif->recycled_req_queue, &req->queue_entry);
rcu_read_unlock();
} }
static void *async_nif_worker_fn(void *); static void *async_nif_worker_fn(void *);
@ -244,41 +260,38 @@ static void *async_nif_worker_fn(void *);
static int static int
async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_queue *q) async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_queue *q)
{ {
struct async_nif_worker_entry *we; struct async_nif_worker_entry *worker;
if (0 == q) if (0 == q)
return EINVAL; return EINVAL;
enif_mutex_lock(async_nif->we_mutex); /* Before creating a new worker thread join threads which have exited. */
for(;;) {
struct cds_lfq_node_rcu *node;
we = SLIST_FIRST(&async_nif->we_joining); rcu_read_lock();
while(we != NULL) { node = cds_lfq_dequeue_rcu(&async_nif->worker_join_queue);
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries); rcu_read_unlock();
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
if (!node) break; /* Queue is empty. */
worker = caa_container_of(node, struct async_nif_worker_entry, queue_entry);
void *exit_value = 0; /* We ignore the thread_join's exit value. */ void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(we->tid, &exit_value); enif_thread_join(worker->tid, &exit_value);
free(we); free(worker);
async_nif->we_active--; uatomic_dec(&async_nif->num_active_workers);
we = n;
} }
if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) { if (uatomic_read(&async_nif->num_active_workers) >= ASYNC_NIF_MAX_WORKERS)
enif_mutex_unlock(async_nif->we_mutex);
return EAGAIN; return EAGAIN;
}
we = malloc(sizeof(struct async_nif_worker_entry)); worker = malloc(sizeof(struct async_nif_worker_entry));
if (!we) { if (!worker) return ENOMEM;
enif_mutex_unlock(async_nif->we_mutex); memset(worker, 0, sizeof(struct async_nif_worker_entry));
return ENOMEM; worker->worker_id = uatomic_add_return(&async_nif->num_active_workers, 1);
} worker->async_nif = async_nif;
memset(we, 0, sizeof(struct async_nif_worker_entry)); worker->q = q;
we->worker_id = async_nif->we_active++; return enif_thread_create(NULL,&worker->tid, &async_nif_worker_fn, (void*)worker, 0);
we->async_nif = async_nif;
we->q = q;
enif_mutex_unlock(async_nif->we_mutex);
return enif_thread_create(NULL,&we->tid, &async_nif_worker_fn, (void*)we, 0);
} }
/** /**
@ -295,6 +308,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
struct async_nif_work_queue *q = NULL; struct async_nif_work_queue *q = NULL;
double avg_depth = 0.0; double avg_depth = 0.0;
//rcu_register_thread();
/* Either we're choosing a queue based on some affinity/hinted value or we /* Either we're choosing a queue based on some affinity/hinted value or we
need to select the next queue in the rotation and atomically update that need to select the next queue in the rotation and atomically update that
global value (next_q is shared across worker threads) . */ global value (next_q is shared across worker threads) . */
@ -312,59 +327,58 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
for (i = 0; i < async_nif->num_queues; i++) { for (i = 0; i < async_nif->num_queues; i++) {
/* Compute the average queue depth not counting queues which are empty or /* Compute the average queue depth not counting queues which are empty or
the queue we're considering right now. */ the queue we're considering right now. */
unsigned int j, n = 0; unsigned int j, d, n = 0;
for (j = 0; j < async_nif->num_queues; j++) { for (j = 0; j < async_nif->num_queues; j++) {
if (j != qid && async_nif->queues[j].depth != 0) { d = uatomic_read(&async_nif->queues[j].depth);
if (j != qid && d != 0) {
n++; n++;
avg_depth += async_nif->queues[j].depth; avg_depth += d;
} }
} }
if (avg_depth) avg_depth /= n; if (avg_depth) avg_depth /= n;
/* Lock this queue under consideration, then check for shutdown. While
we hold this lock either a) we're shutting down so exit now or b) this
queue will be valid until we release the lock. */
q = &async_nif->queues[qid]; q = &async_nif->queues[qid];
enif_mutex_lock(q->reqs_mutex);
/* Try not to enqueue a request into a queue that isn't keeping up with /* Try not to enqueue a request into a queue that isn't keeping up with
the request volume. */ the request volume. */
if (q->depth <= avg_depth) break; if (uatomic_read(&q->depth) <= avg_depth) break;
else { else qid = (qid + 1) % async_nif->num_queues;
enif_mutex_unlock(q->reqs_mutex);
qid = (qid + 1) % async_nif->num_queues;
}
} }
/* If the for loop finished then we didn't find a suitable queue for this /* If the for loop finished then we didn't find a suitable queue for this
request, meaning we're backed up so trigger eagain. Note that if we left request (e.g. we're backed up servicing requests) or the shutdown flag was
the loop in this way we hold no lock. */ set. Returning '0' will toss this request and free its resources.*/
if (i == async_nif->num_queues) return 0; if (i == async_nif->num_queues || uatomic_read(&async_nif->shutdown))
return 0;
/* Add the request to the queue. */ /* Add the request to the queue. */
STAILQ_INSERT_TAIL(&q->reqs, req, entries); cds_lfq_node_init_rcu(&req->queue_entry);
__sync_fetch_and_add(&q->depth, 1); rcu_read_lock();
cds_lfq_enqueue_rcu(&q->req_queue, &req->queue_entry);
rcu_read_unlock();
URCU_TLS(nr_enqueues)++;
uatomic_inc(&q->depth);
uatomic_inc(&q->num_workers);
/* We've selected a queue for this new request now check to make sure there are /* We've selected a queue for this new request now check to make sure there are
enough workers actively processing requests on this queue. */ enough workers actively processing requests on this queue. */
while (q->depth > q->num_workers) { while (uatomic_read(&q->depth) > uatomic_read(&q->num_workers)) {
switch(async_nif_start_worker(async_nif, q)) { switch(async_nif_start_worker(async_nif, q)) {
case EINVAL: case ENOMEM: default: return 0; case EINVAL:
case EAGAIN: continue; case ENOMEM:
case 0: __sync_fetch_and_add(&q->num_workers, 1); goto done; return 0;
default:
case EAGAIN:
continue;
case 0:
uatomic_inc(&q->num_workers);
goto done;
} }
}done:; }done:;
/* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid in another thread
performing the request). */
double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE; double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE;
ERL_NIF_TERM reply = enif_make_tuple2(req->env, ATOM_OK, return enif_make_tuple2(req->env, ATOM_OK,
enif_make_tuple2(req->env, ATOM_ENQUEUED, enif_make_tuple2(req->env, ATOM_ENQUEUED,
enif_make_double(req->env, pct_full))); enif_make_double(req->env, pct_full)));
enif_cond_signal(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex);
return reply;
} }
/** /**
@ -375,73 +389,46 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
static void * static void *
async_nif_worker_fn(void *arg) async_nif_worker_fn(void *arg)
{ {
struct async_nif_worker_entry *we = (struct async_nif_worker_entry *)arg; struct async_nif_worker_entry *worker = (struct async_nif_worker_entry *)arg;
unsigned int worker_id = we->worker_id; unsigned int worker_id = worker->worker_id;
struct async_nif_state *async_nif = we->async_nif; struct async_nif_state *async_nif = worker->async_nif;
struct async_nif_work_queue *q = we->q; struct async_nif_work_queue *l = NULL, *q = worker->q;
// TODO(gburd): set_affinity(); to the CPU_ID for this queue
rcu_register_thread();
while(q != l) {
struct cds_lfq_node_rcu *node;
struct async_nif_req_entry *req = NULL; struct async_nif_req_entry *req = NULL;
unsigned int tries = async_nif->num_queues;
for(;;) { if (uatomic_read(&async_nif->shutdown))
/* Examine the request queue, are there things to be done? */
enif_mutex_lock(q->reqs_mutex);
check_again_for_work:
if (async_nif->shutdown) {
enif_mutex_unlock(q->reqs_mutex);
break; break;
}
if (STAILQ_EMPTY(&q->reqs)) {
/* Queue is empty so we wait for more work to arrive. */
enif_mutex_unlock(q->reqs_mutex);
if (tries == 0 && q == we->q) {
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
/* At this point we've tried to find/execute work on all queues
* and there are at least MIN_WORKERS on this queue so we
* leaving this loop (break) which leads to a thread exit/join. */
break;
} else {
enif_mutex_lock(q->reqs_mutex);
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work;
}
} else {
tries--;
__sync_fetch_and_add(&q->num_workers, -1);
q = q->next;
__sync_fetch_and_add(&q->num_workers, 1);
continue; // try next queue
}
} else {
/* At this point the next req is ours to process and we hold the
reqs_mutex lock. Take the request off the queue. */
req = STAILQ_FIRST(&q->reqs);
STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries);
__sync_fetch_and_add(&q->depth, -1);
/* Wake up other worker thread watching this queue to help process work. */ rcu_read_lock();
enif_cond_signal(q->reqs_cnd); node = cds_lfq_dequeue_rcu(&q->req_queue);
enif_mutex_unlock(q->reqs_mutex); rcu_read_unlock();
/* Perform the work. */ if (node) {
req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
uatomic_dec(&q->depth);
URCU_TLS(nr_dequeues)++;
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
/* Now call the post-work cleanup function. */
req->fn_post(req->args); req->fn_post(req->args);
/* Clean up req for reuse. */
req->ref = 0;
req->fn_work = 0;
req->fn_post = 0;
free(req->args);
req->args = NULL;
async_nif_recycle_req(req, async_nif); async_nif_recycle_req(req, async_nif);
req = NULL; l = q;
} else {
/* This queue is empty, cycle through other queues looking for work. */
uatomic_dec(&q->num_workers);
q = q->next;
uatomic_inc(&q->num_workers);
} }
} }
enif_mutex_lock(async_nif->we_mutex); uatomic_dec(&q->num_workers);
SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries); cds_lfq_node_init_rcu(&worker->queue_entry);
enif_mutex_unlock(async_nif->we_mutex); rcu_read_lock();
__sync_fetch_and_add(&q->num_workers, -1); cds_lfq_enqueue_rcu(&async_nif->worker_join_queue, &worker->queue_entry);
rcu_read_unlock();
rcu_unregister_thread();
enif_thread_exit(0); enif_thread_exit(0);
return 0; return 0;
} }
@ -451,83 +438,74 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
{ {
unsigned int i; unsigned int i;
unsigned int num_queues = async_nif->num_queues; unsigned int num_queues = async_nif->num_queues;
struct cds_lfq_node_rcu *node;
struct async_nif_work_queue *q = NULL; struct async_nif_work_queue *q = NULL;
struct async_nif_req_entry *req = NULL;
struct async_nif_worker_entry *we = NULL;
UNUSED(env); UNUSED(env);
/* Signal the worker threads, stop what you're doing and exit. To ensure /* Signal the worker threads, stop what you're doing and exit. */
that we don't race with the enqueue() process we first lock all the worker uatomic_set(&async_nif->shutdown, 1);
queues, then set shutdown to true, then unlock. The enqueue function will
take the queue mutex, then test for shutdown condition, then enqueue only
if not shutting down. */
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
enif_mutex_lock(q->reqs_mutex);
}
/* Set the shutdown flag so that worker threads will no continue
executing requests. */
async_nif->shutdown = 1;
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
enif_mutex_unlock(q->reqs_mutex);
}
/* Join for the now exiting worker threads. */ /* Join for the now exiting worker threads. */
while(async_nif->we_active > 0) { while(uatomic_read(&async_nif->num_active_workers) > 0) {
for (i = 0; i < num_queues; i++) struct async_nif_worker_entry *worker;
enif_cond_broadcast(async_nif->queues[i].reqs_cnd); struct cds_lfq_node_rcu *node;
enif_mutex_lock(async_nif->we_mutex);
we = SLIST_FIRST(&async_nif->we_joining); rcu_read_lock();
while(we != NULL) { node = cds_lfq_dequeue_rcu(&async_nif->worker_join_queue);
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries); rcu_read_unlock();
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
if (node) {
worker = caa_container_of(node, struct async_nif_worker_entry, queue_entry);
void *exit_value = 0; /* We ignore the thread_join's exit value. */ void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(we->tid, &exit_value); enif_thread_join(worker->tid, &exit_value);
free(we); free(worker);
async_nif->we_active--; uatomic_dec(&async_nif->num_active_workers);
we = n;
} }
enif_mutex_unlock(async_nif->we_mutex);
} }
enif_mutex_destroy(async_nif->we_mutex); cds_lfq_destroy_rcu(&async_nif->worker_join_queue); // TODO(gburd): check return val
/* Cleanup in-flight requests, mutexes and conditions in each work queue. */ /* Cleanup in-flight requests, mutexes and conditions in each work queue. */
for (i = 0; i < num_queues; i++) { for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i]; q = &async_nif->queues[i];
/* Worker threads are stopped, now toss anything left in the queue. */ /* Worker threads are stopped, now toss anything left in the queue. */
req = NULL; for (;;) {
req = STAILQ_FIRST(&q->reqs); rcu_read_lock();
while(req != NULL) { node = cds_lfq_dequeue_rcu(&q->req_queue);
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries); rcu_read_unlock();
if (!node) break; /* Queue is empty. */
struct async_nif_req_entry *req;
req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
enif_clear_env(req->env); enif_clear_env(req->env);
enif_send(NULL, &req->pid, req->env, enif_send(NULL, &req->pid, req->env, enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN));
enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN));
req->fn_post(req->args); req->fn_post(req->args);
enif_free_env(req->env);
free(req->args); free(req->args);
enif_free_env(req->env);
free(req); free(req);
req = n;
} }
enif_mutex_destroy(q->reqs_mutex); cds_lfq_destroy_rcu(&q->req_queue); // TODO(gburd): check return val
enif_cond_destroy(q->reqs_cnd);
} }
/* Free any req structures sitting unused on the recycle queue. */ /* Free any req structures sitting unused on the recycle queue. */
enif_mutex_lock(async_nif->recycled_req_mutex); for (;;) {
req = NULL; rcu_read_lock();
req = STAILQ_FIRST(&async_nif->recycled_reqs); node = cds_lfq_dequeue_rcu(&async_nif->recycled_req_queue);
while(req != NULL) { rcu_read_unlock();
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
enif_free_env(req->env); if (!node) break; /* Queue is empty. */
free(req);
req = n; struct async_nif_req_entry *req;
} req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
enif_free_env(req->env);
free(req->args);
free(req);
}
cds_lfq_destroy_rcu(&async_nif->recycled_req_queue); // TODO(gburd): check return val
enif_mutex_unlock(async_nif->recycled_req_mutex);
enif_mutex_destroy(async_nif->recycled_req_mutex);
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues)); memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
free_all_cpu_call_rcu_data();
free(async_nif); free(async_nif);
} }
@ -551,6 +529,10 @@ async_nif_load(ErlNifEnv *env)
ATOM_OK = enif_make_atom(env, "ok"); ATOM_OK = enif_make_atom(env, "ok");
ATOM_SHUTDOWN = enif_make_atom(env, "shutdown"); ATOM_SHUTDOWN = enif_make_atom(env, "shutdown");
/* Init the RCU library. */
rcu_init();
(void)create_all_cpu_call_rcu_data(0);
/* Find out how many schedulers there are. */ /* Find out how many schedulers there are. */
enif_system_info(&info, sizeof(ErlNifSysInfo)); enif_system_info(&info, sizeof(ErlNifSysInfo));
@ -566,6 +548,7 @@ async_nif_load(ErlNifEnv *env)
if (num_queues < 2) if (num_queues < 2)
num_queues = 2; num_queues = 2;
} }
num_queues = 1; //TODO remove me.
/* Init our portion of priv_data's module-specific state. */ /* Init our portion of priv_data's module-specific state. */
async_nif = malloc(sizeof(struct async_nif_state) + async_nif = malloc(sizeof(struct async_nif_state) +
@ -576,19 +559,15 @@ async_nif_load(ErlNifEnv *env)
sizeof(struct async_nif_work_queue) * num_queues); sizeof(struct async_nif_work_queue) * num_queues);
async_nif->num_queues = num_queues; async_nif->num_queues = num_queues;
async_nif->we_active = 0; async_nif->num_active_workers = 0;
async_nif->next_q = 0; async_nif->next_q = 0;
async_nif->shutdown = 0; async_nif->shutdown = 0;
STAILQ_INIT(&async_nif->recycled_reqs); cds_lfq_init_rcu(&async_nif->recycled_req_queue, call_rcu);
async_nif->recycled_req_mutex = enif_mutex_create("recycled_req"); cds_lfq_init_rcu(&async_nif->worker_join_queue, call_rcu);
async_nif->we_mutex = enif_mutex_create("we");
SLIST_INIT(&async_nif->we_joining);
for (i = 0; i < async_nif->num_queues; i++) { for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i]; struct async_nif_work_queue *q = &async_nif->queues[i];
STAILQ_INIT(&q->reqs); cds_lfq_init_rcu(&q->req_queue, call_rcu);
q->reqs_mutex = enif_mutex_create("reqs");
q->reqs_cnd = enif_cond_create("reqs");
q->next = &async_nif->queues[(i + 1) % num_queues]; q->next = &async_nif->queues[(i + 1) % num_queues];
} }
return async_nif; return async_nif;

View file

@ -1,5 +1,8 @@
#!/bin/bash #!/bin/bash
#t=__.$$
#trap 'rm -f $t; exit 0' 0 1 2 3 13 15
# /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is. # /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is.
if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then
POSIX_SHELL="true" POSIX_SHELL="true"
@ -10,15 +13,22 @@ unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as w
set -e set -e
# WiredTiger
WT_REPO=http://github.com/wiredtiger/wiredtiger.git WT_REPO=http://github.com/wiredtiger/wiredtiger.git
WT_BRANCH=develop #WT_BRANCH=develop
WT_DIR=wiredtiger-`basename $WT_BRANCH` #WT_DIR=wiredtiger-`basename $WT_BRANCH`
#WT_REF="tags/1.6.6" WT_REF="tags/1.6.4"
#WT_DIR=wiredtiger-`basename $WT_REF` WT_DIR=wiredtiger-`basename $WT_REF`
# Google's Snappy Compression
SNAPPY_VSN="1.0.4" SNAPPY_VSN="1.0.4"
SNAPPY_DIR=snappy-$SNAPPY_VSN SNAPPY_DIR=snappy-$SNAPPY_VSN
# User-space Read-Copy-Update (RCU)
URCU_REPO=git://git.lttng.org/userspace-rcu.git
URCU_REF="tags/v0.7.7"
URCU_DIR=urcu-`basename $URCU_REF`
[ `basename $PWD` != "c_src" ] && cd c_src [ `basename $PWD` != "c_src" ] && cd c_src
export BASEDIR="$PWD" export BASEDIR="$PWD"
@ -26,10 +36,41 @@ export BASEDIR="$PWD"
which gmake 1>/dev/null 2>/dev/null && MAKE=gmake which gmake 1>/dev/null 2>/dev/null && MAKE=gmake
MAKE=${MAKE:-make} MAKE=${MAKE:-make}
export CPPFLAGS="$CPPLAGS -I $BASEDIR/system/include -O3 -mtune=native -march=native" export CFLAGS="$CFLAGS -I $BASEDIR/system/include"
export CXXFLAGS="$CXXFLAGS -I $BASEDIR/system/include"
export LDFLAGS="$LDFLAGS -L$BASEDIR/system/lib" export LDFLAGS="$LDFLAGS -L$BASEDIR/system/lib"
export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$BASEDIR/system/lib:$LD_LIBRARY_PATH" export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$BASEDIR/system/lib:$LD_LIBRARY_PATH"
get_urcu ()
{
if [ -d $BASEDIR/$URCU_DIR/.git ]; then
(cd $BASEDIR/$URCU_DIR && git pull -u) || exit 1
else
if [ "X$URCU_REF" != "X" ]; then
git clone ${URCU_REPO} ${URCU_DIR} && \
(cd $BASEDIR/$URCU_DIR && git checkout refs/$URCU_REF || exit 1)
else
git clone ${URCU_REPO} ${URCU_DIR} && \
(cd $BASEDIR/$URCU_DIR && git checkout -b $URCU_BRANCH origin/$URCU_BRANCH || exit 1)
fi
fi
[ -d $BASEDIR/$URCU_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
(cd $BASEDIR/$URCU_DIR
[ -e $BASEDIR/urcu-build.patch ] && \
(patch -p1 --forward < $BASEDIR/urcu-build.patch || exit 1 )
autoreconf -fis || exit 1
urcu_configure;
)
}
urcu_configure ()
{
(cd $BASEDIR/$URCU_DIR
LDFLAGS+="-Wl,-rpath,lib/urcu-v0.7.7/priv:lib/urcu/priv:priv" \
CFLAGS+="-m64 -Os -g -march=native -mtune=native -fPIC" \
./configure --prefix=${BASEDIR}/system || exit 1)
}
get_wt () get_wt ()
{ {
if [ -d $BASEDIR/$WT_DIR/.git ]; then if [ -d $BASEDIR/$WT_DIR/.git ]; then
@ -57,8 +98,9 @@ get_wt ()
wt_configure () wt_configure ()
{ {
(cd $BASEDIR/$WT_DIR/build_posix (cd $BASEDIR/$WT_DIR/build_posix
CFLAGS+=-g $BASEDIR/$WT_DIR/configure --with-pic \ CFLAGS+=-g ../configure --with-pic \
--enable-snappy \ --enable-snappy \
--disable-python --disable-java \
--prefix=${BASEDIR}/system || exit 1) --prefix=${BASEDIR}/system || exit 1)
} }
@ -75,8 +117,9 @@ get_snappy ()
get_deps () get_deps ()
{ {
get_snappy;
get_wt; get_wt;
get_snappy;
get_urcu;
} }
update_deps () update_deps ()
@ -90,6 +133,22 @@ update_deps ()
fi fi
) )
fi fi
if [ -d $BASEDIR/$URCU_DIR/.git ]; then
(cd $BASEDIR/$URCU_DIR
if [ "X$URCU_VSN" == "X" ]; then
git pull -u || exit 1
else
git checkout $URCU_VSN || exit 1
fi
)
fi
}
build_urcu ()
{
urcu_configure;
(cd $BASEDIR/$URCU_DIR && $MAKE -j && $MAKE install)
} }
build_wt () build_wt ()
@ -110,6 +169,7 @@ case "$1" in
clean) clean)
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \ [ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE clean) (cd $BASEDIR/$WT_DIR/build_posix && $MAKE clean)
[ -e $BASEDIR/$URCU_DIR/Makefile ] && (cd $BASEDIR/$URCU_DIR && $MAKE clean)
rm -rf system $SNAPPY_DIR rm -rf system $SNAPPY_DIR
rm -f ${BASEDIR}/../priv/wt rm -f ${BASEDIR}/../priv/wt
rm -f ${BASEDIR}/../priv/libwiredtiger-*.so rm -f ${BASEDIR}/../priv/libwiredtiger-*.so
@ -130,23 +190,30 @@ case "$1" in
;; ;;
*) *)
shopt -s extglob
SUFFIXES='@(so|dylib)'
# Build Snappy # Build Snappy
[ -d $SNAPPY_DIR ] || get_snappy; [ -d $SNAPPY_DIR ] || get_snappy;
[ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory" && exit 1) [ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory" && exit 1)
test -f $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9].* || build_snappy; test -f $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9] || build_snappy;
# Build URCU
[ -d $URCU_DIR ] || get_urcu;
[ -d $BASEDIR/$URCU_DIR ] || (echo "Missing URCU source directory" && exit 1)
test -f $BASEDIR/system/lib/liburcu.a || build_urcu;
# Build WiredTiger # Build WiredTiger
[ -d $WT_DIR ] || get_wt; [ -d $WT_DIR ] || get_wt;
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1) [ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
test -f $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].${SUFFIXES} -a \ test -f $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so \
-f $BASEDIR/system/lib/libwiredtiger_snappy.${SUFFIXES} || build_wt; -a -f $BASEDIR/system/lib/libwiredtiger_snappy.so || build_wt;
[ -d $BASEDIR/../priv ] || mkdir ${BASEDIR}/../priv [ -d $BASEDIR/../priv ] || mkdir ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/bin/wt ${BASEDIR}/../priv cp -p -P $BASEDIR/system/bin/wt ${BASEDIR}/../priv
cp -p -P ${BASEDIR}/system/lib/libwiredtiger-[0-9].[0-9].[0-9].${SUFFIXES} ${BASEDIR}/../priv cp -p -P $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so ${BASEDIR}/../priv
cp -p -P ${BASEDIR}/system/lib/libwiredtiger_snappy.${SUFFIXES} ${BASEDIR}/../priv cp -p -P $BASEDIR/system/lib/libwiredtiger_snappy.so* ${BASEDIR}/../priv
cp -p -P ${BASEDIR}/system/lib/libsnappy.${SUFFIXES}* ${BASEDIR}/../priv cp -p -P $BASEDIR/system/lib/libsnappy.so* ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/liburcu.so* ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/liburcu-*.so* ${BASEDIR}/../priv
;; ;;
esac esac
exit 0

View file

@ -42,8 +42,8 @@ extern "C" {
#define DPUTS(arg) ((void) 0) #define DPUTS(arg) ((void) 0)
#endif #endif
#ifndef __UNUSED #ifndef UNUSED
#define __UNUSED(v) ((void)(v)) #define UNUSED(v) ((void)(v))
#endif #endif
#ifndef COMPQUIET #ifndef COMPQUIET

23
c_src/urcu-build.patch Normal file
View file

@ -0,0 +1,23 @@
diff --git a/urcu/rculfhash.h b/urcu/rculfhash.h
index 23bd1ce..98e00b4 100644
--- a/urcu/rculfhash.h
+++ b/urcu/rculfhash.h
@@ -31,6 +31,10 @@
#include <urcu-call-rcu.h>
#include <urcu-flavor.h>
+#ifndef UNUSED
+#define UNUSED(v) ((void)(v))
+#endif
+
#ifdef __cplusplus
extern "C" {
#endif
@@ -92,6 +96,7 @@ typedef int (*cds_lfht_match_fct)(struct cds_lfht_node *node, const void *key);
static inline
void cds_lfht_node_init(struct cds_lfht_node *node)
{
+ UNUSED(node);
}
/*

View file

@ -1,5 +1,5 @@
diff --git a/ext/compressors/snappy/Makefile.am b/ext/compressors/snappy/Makefile.am diff --git a/ext/compressors/snappy/Makefile.am b/ext/compressors/snappy/Makefile.am
index 6d78823..c423590 100644 index 6d78823..2122cf8 100644
--- a/ext/compressors/snappy/Makefile.am --- a/ext/compressors/snappy/Makefile.am
+++ b/ext/compressors/snappy/Makefile.am +++ b/ext/compressors/snappy/Makefile.am
@@ -2,5 +2,6 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include @@ -2,5 +2,6 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
@ -10,3 +10,97 @@ index 6d78823..c423590 100644
+libwiredtiger_snappy_la_CFLAGS = -I$(abs_top_builddir)/../../system/include +libwiredtiger_snappy_la_CFLAGS = -I$(abs_top_builddir)/../../system/include
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -L$(abs_top_builddir)/../../system/lib -Wl,-rpath,lib/wterl-0.9.0/priv:lib/wterl/priv:priv +libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -L$(abs_top_builddir)/../../system/lib -Wl,-rpath,lib/wterl-0.9.0/priv:lib/wterl/priv:priv
libwiredtiger_snappy_la_LIBADD = -lsnappy libwiredtiger_snappy_la_LIBADD = -lsnappy
diff --git a/src/support/cksum.c b/src/support/cksum.c
index 7e9befe..b924db7 100644
--- a/src/support/cksum.c
+++ b/src/support/cksum.c
@@ -27,6 +27,13 @@
#include "wt_internal.h"
+#if defined(__amd64) || defined(__x86_64)
+#define USE_HARDWARE_CRC32 1
+#else
+#undef USE_HARDWARE_CRC32
+#endif
+
+#ifdef USE_HARDWARE_CRC32
static const uint32_t g_crc_slicing[8][256] = {
#ifdef WORDS_BIGENDIAN
/*
@@ -1078,6 +1085,7 @@ static const uint32_t g_crc_slicing[8][256] = {
}
#endif
};
+#endif /* USE_HARDWARE_CRC32 */
/*
* __wt_cksum --
@@ -1106,15 +1114,29 @@ __wt_cksum(const void *chunk, size_t len)
/* Checksum one byte at a time to the first 4B boundary. */
for (p = chunk;
((uintptr_t)p & (sizeof(uint32_t) - 1)) != 0 &&
- len > 0; ++p, --len)
+ len > 0; ++p, --len) {
+#ifdef USE_HARDWARE_CRC32
+ __asm__ __volatile__(
+ ".byte 0xF2, 0x0F, 0x38, 0xF0, 0xF1"
+ : "=S" (crc)
+ : "0" (crc), "c" (*p));
+#else
#ifdef WORDS_BIGENDIAN
crc = g_crc_slicing[0][((crc >> 24) ^ *p) & 0xFF] ^ (crc << 8);
#else
crc = g_crc_slicing[0][(crc ^ *p) & 0xFF] ^ (crc >> 8);
#endif
+#endif
+ }
/* Checksum in 8B chunks. */
for (nqwords = len / sizeof(uint64_t); nqwords; nqwords--) {
+#ifdef USE_HARDWARE_CRC32
+ __asm__ __volatile__ (
+ ".byte 0xf2, 0x48, 0x0f, 0x38, 0xf0, 0xf1;"
+ : "=S"(crc)
+ : "S"(crc), "c"(*p));
+#else
crc ^= *(uint32_t *)p;
p += sizeof(uint32_t);
next = *(uint32_t *)p;
@@ -1139,22 +1161,32 @@ __wt_cksum(const void *chunk, size_t len)
g_crc_slicing[1][(next >> 16) & 0xFF] ^
g_crc_slicing[0][(next >> 24)];
#endif
+#endif
}
/* Checksum trailing bytes one byte at a time. */
+ for (len &= 0x7; len > 0; ++p, len--) {
+#ifdef USE_HARDWARE_CRC32
+ __asm__ __volatile__(
+ ".byte 0xF2, 0x0F, 0x38, 0xF0, 0xF1"
+ : "=S" (crc)
+ : "0" (crc), "c" (*p));
+#else
#ifdef WORDS_BIGENDIAN
- for (len &= 0x7; len > 0; ++p, len--)
crc = g_crc_slicing[0][((crc >> 24) ^ *p) & 0xFF] ^ (crc << 8);
+#else
+ crc = g_crc_slicing[0][(crc ^ *p) & 0xFF] ^ (crc >> 8);
+#endif
+#endif
+ }
+#ifdef WORDS_BIGENDIAN
/* Do final byte swap to produce a result identical to little endian */
crc =
((crc << 24) & 0xFF000000) |
((crc << 8) & 0x00FF0000) |
((crc >> 8) & 0x0000FF00) |
((crc >> 24) & 0x000000FF);
-#else
- for (len &= 0x7; len > 0; ++p, len--)
- crc = g_crc_slicing[0][(crc ^ *p) & 0xFF] ^ (crc >> 8);
#endif
return (~crc);
}

View file

@ -440,7 +440,6 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
return; return;
} }
/** /**
* Callback to handle error messages. * Callback to handle error messages.
* *
@ -455,15 +454,13 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
* operation or library failure. * operation or library failure.
*/ */
int int
__wterl_error_handler(WT_EVENT_HANDLER *handler, WT_SESSION *session, __wterl_error_handler(WT_EVENT_HANDLER *handler, int error, const char *message)
int error, const char *message)
{ {
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler; struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
ErlNifEnv *msg_env; ErlNifEnv *msg_env;
ErlNifPid *to_pid; ErlNifPid *to_pid;
int rc = 0; int rc = 0;
UNUSED(session);
enif_mutex_lock(eh->error_mutex); enif_mutex_lock(eh->error_mutex);
msg_env = eh->msg_env_error; msg_env = eh->msg_env_error;
to_pid = &eh->to_pid; to_pid = &eh->to_pid;
@ -495,14 +492,13 @@ __wterl_error_handler(WT_EVENT_HANDLER *handler, WT_SESSION *session,
* operation or library failure. * operation or library failure.
*/ */
int int
__wterl_message_handler(WT_EVENT_HANDLER *handler, WT_SESSION *session, const char *message) __wterl_message_handler(WT_EVENT_HANDLER *handler, const char *message)
{ {
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler; struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
ErlNifEnv *msg_env; ErlNifEnv *msg_env;
ErlNifPid *to_pid; ErlNifPid *to_pid;
int rc = 0; int rc = 0;
UNUSED(session);
enif_mutex_lock(eh->message_mutex); enif_mutex_lock(eh->message_mutex);
msg_env = eh->msg_env_message; msg_env = eh->msg_env_message;
to_pid = &eh->to_pid; to_pid = &eh->to_pid;
@ -533,14 +529,13 @@ __wterl_message_handler(WT_EVENT_HANDLER *handler, WT_SESSION *session, const ch
* operation or library failure. * operation or library failure.
*/ */
int int
__wterl_progress_handler(WT_EVENT_HANDLER *handler, WT_SESSION *session, const char *operation, uint64_t counter) __wterl_progress_handler(WT_EVENT_HANDLER *handler, const char *operation, uint64_t counter)
{ {
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler; struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
ErlNifEnv *msg_env; ErlNifEnv *msg_env;
ErlNifPid *to_pid; ErlNifPid *to_pid;
int rc = 0; int rc = 0;
UNUSED(session);
enif_mutex_lock(eh->progress_mutex); enif_mutex_lock(eh->progress_mutex);
msg_env = eh->msg_env_progress; msg_env = eh->msg_env_progress;
to_pid = &eh->to_pid; to_pid = &eh->to_pid;
@ -2308,7 +2303,7 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
char msg[1024]; char msg[1024];
snprintf(msg, 1024, "NIF on_load complete (wterl version: %s, wiredtiger version: %s)", priv->wterl_vsn, priv->wiredtiger_vsn); snprintf(msg, 1024, "NIF on_load complete (wterl version: %s, wiredtiger version: %s)", priv->wterl_vsn, priv->wiredtiger_vsn);
__wterl_message_handler((WT_EVENT_HANDLER *)&priv->eh, NULL, msg); __wterl_message_handler((WT_EVENT_HANDLER *)&priv->eh, msg);
return 0; return 0;
} }

View file

@ -1,6 +0,0 @@
%%%% This is the WiredTiger section
%% @doc wiredtiger data_root
{mapping, "wiredtiger.data_root", "wterl.data_root", [
{default, "{{platform_data_dir}}/wiredtiger"}
]}.

View file

@ -38,8 +38,8 @@
{port_specs, [{"priv/wterl.so", ["c_src/*.c"]}]}. {port_specs, [{"priv/wterl.so", ["c_src/*.c"]}]}.
{port_env, [ {port_env, [
{"DRV_CFLAGS", "$DRV_CFLAGS -O3 -mtune=native -march=native -fPIC -Wall -Wextra -Werror -I c_src/system/include"}, {"DRV_CFLAGS", "$DRV_CFLAGS -fPIC -Wall -Wextra -Werror -fdebug-cpp -ftrack-macro-expansion=2 -I c_src/system/include"},
{"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:lib/wterl-0.9.0/priv:priv -Lc_src/system/lib -lwiredtiger"} {"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:priv -Lc_src/system/lib -lurcu -lurcu-cds -lwiredtiger"}
]}. ]}.
{pre_hooks, [{compile, "c_src/build_deps.sh compile"}]}. {pre_hooks, [{compile, "c_src/build_deps.sh compile"}]}.

View file

@ -22,7 +22,6 @@
-module(riak_kv_wterl_backend). -module(riak_kv_wterl_backend).
-behavior(temp_riak_kv_backend). -behavior(temp_riak_kv_backend).
-compile([{parse_transform, lager_transform}]).
%% KV Backend API %% KV Backend API
-export([api_version/0, -export([api_version/0,
@ -43,7 +42,7 @@
-ifdef(TEST). -ifdef(TEST).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-compile(export_all). -compiel(export_all).
-endif. -endif.
-define(API_VERSION, 1). -define(API_VERSION, 1).
@ -120,14 +119,14 @@ start(Partition, Config) ->
"lsm" -> "lsm" ->
[{internal_page_max, "128K"}, [{internal_page_max, "128K"},
{leaf_page_max, "16K"}, {leaf_page_max, "16K"},
{lsm, [ {lsm_chunk_size, "100MB"},
{bloom_config, [{leaf_page_max, "8MB"}]}, {lsm_merge_threads, 2},
{bloom_bit_count, 28}, {prefix_compression, true},
{bloom_hash_count, 19}, {lsm_bloom_newest, true},
{bloom_oldest, true}, {lsm_bloom_oldest, true} ,
{chunk_size, "100MB"}, {lsm_bloom_bit_count, 28},
{merge_threads, 2} {lsm_bloom_hash_count, 19},
]} {lsm_bloom_config, [{leaf_page_max, "8MB"}]}
] ++ Compressor; ] ++ Compressor;
"table" -> "table" ->
Compressor Compressor
@ -342,23 +341,22 @@ is_empty(#state{connection=Connection, table=Table}) ->
%% @doc Get the status information for this wterl backend %% @doc Get the status information for this wterl backend
-spec status(state()) -> [{atom(), term()}]. -spec status(state()) -> [{atom(), term()}].
status(#state{connection=Connection, table=Table}) -> status(#state{connection=Connection, table=Table}) ->
[]. case wterl:cursor_open(Connection, Table) of
%% case wterl:cursor_open(Connection, "statistics:" ++ Table, [{statistics_fast, true}]) of {ok, Cursor} ->
%% {ok, Cursor} -> TheStats =
%% TheStats = case fetch_status(Cursor) of
%% case fetch_status(Cursor) of {ok, Stats} ->
%% {ok, Stats} -> Stats;
%% Stats; {error, {eperm, _}} -> % TODO: review/fix this logic
%% {error, {eperm, _}} -> % TODO: review/fix this logic {ok, []};
%% {ok, []}; _ ->
%% _ -> {ok, []}
%% {ok, []} end,
%% end, wterl:cursor_close(Cursor),
%% wterl:cursor_close(Cursor), TheStats;
%% TheStats; {error, Reason2} ->
%% {error, Reason2} -> {error, Reason2}
%% {error, Reason2} end.
%% end.
%% @doc Register an asynchronous callback %% @doc Register an asynchronous callback
-spec callback(reference(), any(), state()) -> {ok, state()}. -spec callback(reference(), any(), state()) -> {ok, state()}.
@ -401,41 +399,30 @@ establish_connection(Config, Type) ->
ok = filelib:ensure_dir(filename:join(DataRoot, "x")), ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
%% WT Connection Options: %% WT Connection Options:
LogSetting = app_helper:get_prop_or_env(log, Config, wterl, false), %% NOTE: LSM auto-checkpoints, so we don't have too.
CheckpointSetting = CheckpointSetting =
case Type =:= "lsm" of case Type =:= "lsm" of
true -> true ->
case LogSetting of [];
true ->
%% Turn checkpoints on if logging is on, checkpoints enable log archival.
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 30}]); % in seconds
_ ->
[]
end;
false -> false ->
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 30}]) app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 10}])
end, end,
RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl), RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl),
ConnectionOpts = ConnectionOpts =
orddict:from_list( orddict:from_list(
[ wterl:config_value(create, Config, true), [ wterl:config_value(create, Config, true),
wterl:config_value(checkpoint_sync, Config, false), wterl:config_value(sync, Config, false),
wterl:config_value(transaction_sync, Config, "none"),
wterl:config_value(log, Config, [{enabled, LogSetting}]),
wterl:config_value(mmap, Config, false),
wterl:config_value(checkpoint, Config, CheckpointSetting),
wterl:config_value(session_max, Config, max_sessions(Config)), wterl:config_value(session_max, Config, max_sessions(Config)),
wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)), wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)),
wterl:config_value(statistics, Config, [ "fast", "clear"]),
wterl:config_value(statistics_log, Config, [{wait, 600}]), % in seconds wterl:config_value(statistics_log, Config, [{wait, 600}]), % in seconds
wterl:config_value(verbose, Config, [ "salvage", "verify" wterl:config_value(verbose, Config, [ "salvage", "verify"
% Note: for some unknown reason, if you add these additional % Note: for some unknown reason, if you add these additional
% verbose flags Erlang SEGV's "size_object: bad tag for 0x80" % verbose flags Erlang SEGV's "size_object: bad tag for 0x80"
% no idea why... you've been warned. % no idea why... yet... you've been warned.
%"block", "shared_cache", "reconcile", "evict", "lsm", %"block", "shared_cache", "reconcile", "evict", "lsm",
%"fileops", "read", "write", "readserver", "evictserver", %"fileops", "read", "write", "readserver", "evictserver",
%"hazard", "mutex", "ckpt" %"hazard", "mutex", "ckpt"
]) ] ++ proplists:get_value(wterl, Config, [])), % sec ]) ] ++ CheckpointSetting ++ proplists:get_value(wterl, Config, [])), % sec
%% WT Session Options: %% WT Session Options:
SessionOpts = [{isolation, "snapshot"}], SessionOpts = [{isolation, "snapshot"}],
@ -556,15 +543,15 @@ from_index_key(LKey) ->
%% @private %% @private
%% Return all status from wterl statistics cursor %% Return all status from wterl statistics cursor
%% fetch_status(Cursor) -> fetch_status(Cursor) ->
%% {ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}. {ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}.
%% fetch_status(_Cursor, {error, _}, Acc) -> fetch_status(_Cursor, {error, _}, Acc) ->
%% lists:reverse(Acc); lists:reverse(Acc);
%% fetch_status(_Cursor, not_found, Acc) -> fetch_status(_Cursor, not_found, Acc) ->
%% lists:reverse(Acc); lists:reverse(Acc);
%% fetch_status(Cursor, {ok, Stat}, Acc) -> fetch_status(Cursor, {ok, Stat}, Acc) ->
%% [What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])], [What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])],
%% fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]). fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]).
size_cache(RequestedSize) -> size_cache(RequestedSize) ->
Size = Size =

View file

@ -96,8 +96,8 @@ nif_stub_error(Line) ->
-spec init() -> ok | {error, any()}. -spec init() -> ok | {error, any()}.
init() -> init() ->
erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]), erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]),
[{wterl_vsn, "942e51b"}, [{wterl_vsn, "53307e8"},
{wiredtiger_vsn, "1.6.4-275-g9c44420"}]). %% TODO automate these {wiredtiger_vsn, "1.6.2-0-g07cb0a5"}]).
-spec connection_open(string(), config_list()) -> {ok, connection()} | {error, term()}. -spec connection_open(string(), config_list()) -> {ok, connection()} | {error, term()}.
-spec connection_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}. -spec connection_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}.
@ -454,26 +454,17 @@ config_to_bin([], Acc) ->
config_to_bin([{Key, Value} | Rest], Acc) -> config_to_bin([{Key, Value} | Rest], Acc) ->
ConfigTypes = ConfigTypes =
[{block_compressor, {string, quoted}}, [{block_compressor, {string, quoted}},
{bloom_bit_count, integer},
{bloom_config, config},
{bloom_hash_count, integer},
{bloom_newest, bool},
{bloom_oldest, bool},
{cache_size, string}, {cache_size, string},
{checkpoint, config}, {checkpoint, config},
{checkpoint_sync, bool},
{checksum, string}, {checksum, string},
{chunk_size, string},
{create, bool}, {create, bool},
{direct_io, list}, {direct_io, list},
{drop, list}, {drop, list},
{enabled, bool},
{error_prefix, string}, {error_prefix, string},
{eviction_target, integer}, {eviction_target, integer},
{eviction_trigger, integer}, {eviction_trigger, integer},
{extensions, {list, quoted}}, {extensions, {list, quoted}},
{statistics_fast, bool}, {statistics_fast, bool},
{file_max, string},
{force, bool}, {force, bool},
{from, string}, {from, string},
{hazard_max, integer}, {hazard_max, integer},
@ -483,21 +474,24 @@ config_to_bin([{Key, Value} | Rest], Acc) ->
{isolation, string}, {isolation, string},
{key_type, string}, {key_type, string},
{leaf_page_max, string}, {leaf_page_max, string},
{log, config}, {logging, bool},
{lsm, config}, {lsm_bloom_bit_count, integer},
{mmap, bool}, {lsm_bloom_config, config},
{merge_threads, integer}, {lsm_bloom_hash_count, integer},
{lsm_bloom_newest, bool},
{lsm_bloom_oldest, bool},
{lsm_chunk_size, string},
{prefix_compression, bool},
{lsm_merge_threads, integer},
{multiprocess, bool}, {multiprocess, bool},
{name, string}, {name, string},
{overwrite, bool}, {overwrite, bool},
{prefix_compression, bool},
{raw, bool}, {raw, bool},
{session_max, integer}, {session_max, integer},
{statistics, list},
{statistics_log, config}, {statistics_log, config},
{sync, bool},
{target, {list, quoted}}, {target, {list, quoted}},
{to, string}, {to, string},
{transaction_sync, string},
{transactional, bool}, {transactional, bool},
{verbose, list}, {verbose, list},
{wait, integer}], {wait, integer}],

View file

@ -25,7 +25,7 @@
{mode, max}. {mode, max}.
{duration, 10}. {duration, 10}.
{concurrent, 16}. {concurrent, 4}.
{report_interval, 1}. {report_interval, 1}.
{pb_timeout_general, 1000}. % ms {pb_timeout_general, 1000}. % ms
%{pb_timeout_read, ?}. %{pb_timeout_read, ?}.
@ -43,9 +43,7 @@
{wterl, [ {wterl, [
{connection, [ {connection, [
{create, true}, {create, true},
{session_sync, false}, {sync, false},
{transaction_sync, "none"},
{log, [{enabled, false}]},
{session_max, 1024}, {session_max, 1024},
{cache_size, 4294967296}, {cache_size, 4294967296},
{verbose, []}, {verbose, []},
@ -60,11 +58,11 @@
]}, ]},
{session, [ {isolation, "snapshot"} ]}, {session, [ {isolation, "snapshot"} ]},
{table_uri, "lsm:test"}, {table_uri, "lsm:test"},
{lsm_merge_threads, 2},
{table, [ {table, [
{internal_page_max, "128K"}, {internal_page_max, "128K"},
{leaf_page_max, "128K"}, {leaf_page_max, "128K"},
{lsm_chunk_size, "25MB"}, {lsm_chunk_size, "25MB"},
{prefix_compression, false},
{lsm_bloom_newest, true}, {lsm_bloom_newest, true},
{lsm_bloom_oldest, true} , {lsm_bloom_oldest, true} ,
{lsm_bloom_bit_count, 128}, {lsm_bloom_bit_count, 128},
@ -78,9 +76,7 @@
{wterl_, [ {wterl_, [
{connection, [ {connection, [
{create, true}, {create, true},
{session_sync, false}, {sync, false},
{transaction_sync, "none"},
{log, [{enabled, false}]},
{session_max, 1024}, {session_max, 1024},
{cache_size, 4294967296}, {cache_size, 4294967296},
{verbose, []}, {verbose, []},
@ -97,6 +93,7 @@
{session, [ {isolation, "snapshot"} ]}, {session, [ {isolation, "snapshot"} ]},
{table_uri, "table:test"}, {table_uri, "table:test"},
{table, [ {table, [
{prefix_compression, false},
{block_compressor, "snappy"} % bzip2 {block_compressor, "snappy"} % bzip2
]} ]}
]}. ]}.