Compare commits
9 commits
Author | SHA1 | Date | |
---|---|---|---|
|
08acdbefc8 | ||
|
033661875e | ||
|
9496f99019 | ||
|
f4a1126fde | ||
|
da418b4abf | ||
|
a984a6dae0 | ||
|
45037cbcca | ||
|
e9f9d13e47 | ||
|
866b2a2ed1 |
13 changed files with 475 additions and 345 deletions
4
.gitignore
vendored
4
.gitignore
vendored
|
@ -7,8 +7,6 @@ c_src/*.o
|
||||||
c_src/bzip2-1.0.6
|
c_src/bzip2-1.0.6
|
||||||
c_src/snappy-1.0.4
|
c_src/snappy-1.0.4
|
||||||
deps/
|
deps/
|
||||||
priv/wt
|
priv/
|
||||||
priv/*.so*
|
|
||||||
priv/*.dylib*
|
|
||||||
log/
|
log/
|
||||||
*~
|
*~
|
||||||
|
|
4
Makefile
4
Makefile
|
@ -52,17 +52,19 @@ endif
|
||||||
.PHONY: all compile doc clean test dialyzer typer shell distclean pdf \
|
.PHONY: all compile doc clean test dialyzer typer shell distclean pdf \
|
||||||
update-deps clean-common-test-data rebuild
|
update-deps clean-common-test-data rebuild
|
||||||
|
|
||||||
all: deps compile
|
all: deps compile test
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
# Rules to build the system
|
# Rules to build the system
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
|
|
||||||
deps:
|
deps:
|
||||||
|
c_src/build_deps.sh get-deps
|
||||||
$(REBAR) get-deps
|
$(REBAR) get-deps
|
||||||
$(REBAR) compile
|
$(REBAR) compile
|
||||||
|
|
||||||
update-deps:
|
update-deps:
|
||||||
|
c_src/build_deps.sh update-deps
|
||||||
$(REBAR) update-deps
|
$(REBAR) update-deps
|
||||||
$(REBAR) compile
|
$(REBAR) compile
|
||||||
|
|
||||||
|
|
|
@ -24,9 +24,11 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include <assert.h>
|
#include <stdlib.h>
|
||||||
|
#include <urcu.h> /* RCU flavor */
|
||||||
#include "queue.h"
|
#include <urcu/arch.h> /* RCU Architecture */
|
||||||
|
#include <urcu/tls-compat.h> /* RCU Thread-local storage */
|
||||||
|
#include <urcu/rculfqueue.h> /* RCU Lock-free queue */
|
||||||
|
|
||||||
#ifndef UNUSED
|
#ifndef UNUSED
|
||||||
#define UNUSED(v) ((void)(v))
|
#define UNUSED(v) ((void)(v))
|
||||||
|
@ -37,6 +39,9 @@ extern "C" {
|
||||||
#define ASYNC_NIF_WORKER_QUEUE_SIZE 8192
|
#define ASYNC_NIF_WORKER_QUEUE_SIZE 8192
|
||||||
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
|
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
|
||||||
|
|
||||||
|
static DEFINE_URCU_TLS(unsigned long long, nr_enqueues);
|
||||||
|
static DEFINE_URCU_TLS(unsigned long long, nr_dequeues);
|
||||||
|
|
||||||
/* Atoms (initialized in on_load) */
|
/* Atoms (initialized in on_load) */
|
||||||
static ERL_NIF_TERM ATOM_EAGAIN;
|
static ERL_NIF_TERM ATOM_EAGAIN;
|
||||||
static ERL_NIF_TERM ATOM_ENOMEM;
|
static ERL_NIF_TERM ATOM_ENOMEM;
|
||||||
|
@ -45,7 +50,6 @@ static ERL_NIF_TERM ATOM_ERROR;
|
||||||
static ERL_NIF_TERM ATOM_OK;
|
static ERL_NIF_TERM ATOM_OK;
|
||||||
static ERL_NIF_TERM ATOM_SHUTDOWN;
|
static ERL_NIF_TERM ATOM_SHUTDOWN;
|
||||||
|
|
||||||
|
|
||||||
struct async_nif_req_entry {
|
struct async_nif_req_entry {
|
||||||
ERL_NIF_TERM ref;
|
ERL_NIF_TERM ref;
|
||||||
ErlNifEnv *env;
|
ErlNifEnv *env;
|
||||||
|
@ -53,17 +57,15 @@ struct async_nif_req_entry {
|
||||||
void *args;
|
void *args;
|
||||||
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
|
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
|
||||||
void (*fn_post)(void *);
|
void (*fn_post)(void *);
|
||||||
STAILQ_ENTRY(async_nif_req_entry) entries;
|
struct cds_lfq_node_rcu queue_entry;
|
||||||
|
struct rcu_head rcu_head;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
struct async_nif_work_queue {
|
struct async_nif_work_queue {
|
||||||
unsigned int num_workers;
|
unsigned int num_workers;
|
||||||
unsigned int depth;
|
unsigned int depth;
|
||||||
ErlNifMutex *reqs_mutex;
|
struct cds_lfq_queue_rcu req_queue;
|
||||||
ErlNifCond *reqs_cnd;
|
|
||||||
struct async_nif_work_queue *next;
|
struct async_nif_work_queue *next;
|
||||||
STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct async_nif_worker_entry {
|
struct async_nif_worker_entry {
|
||||||
|
@ -71,19 +73,18 @@ struct async_nif_worker_entry {
|
||||||
unsigned int worker_id;
|
unsigned int worker_id;
|
||||||
struct async_nif_state *async_nif;
|
struct async_nif_state *async_nif;
|
||||||
struct async_nif_work_queue *q;
|
struct async_nif_work_queue *q;
|
||||||
SLIST_ENTRY(async_nif_worker_entry) entries;
|
struct cds_lfq_node_rcu queue_entry;
|
||||||
|
struct rcu_head rcu_head;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct async_nif_state {
|
struct async_nif_state {
|
||||||
unsigned int shutdown;
|
unsigned int shutdown;
|
||||||
ErlNifMutex *we_mutex;
|
unsigned int num_active_workers;
|
||||||
unsigned int we_active;
|
struct cds_lfq_queue_rcu worker_join_queue;
|
||||||
SLIST_HEAD(joining, async_nif_worker_entry) we_joining;
|
|
||||||
unsigned int num_queues;
|
unsigned int num_queues;
|
||||||
unsigned int next_q;
|
unsigned int next_q;
|
||||||
STAILQ_HEAD(recycled_reqs, async_nif_req_entry) recycled_reqs;
|
struct cds_lfq_queue_rcu recycled_req_queue;
|
||||||
unsigned int num_reqs;
|
unsigned int num_reqs;
|
||||||
ErlNifMutex *recycled_req_mutex;
|
|
||||||
struct async_nif_work_queue queues[];
|
struct async_nif_work_queue queues[];
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -189,30 +190,38 @@ struct async_nif_state {
|
||||||
struct async_nif_req_entry *
|
struct async_nif_req_entry *
|
||||||
async_nif_reuse_req(struct async_nif_state *async_nif)
|
async_nif_reuse_req(struct async_nif_state *async_nif)
|
||||||
{
|
{
|
||||||
|
struct cds_lfq_node_rcu *node;
|
||||||
struct async_nif_req_entry *req = NULL;
|
struct async_nif_req_entry *req = NULL;
|
||||||
ErlNifEnv *env = NULL;
|
ErlNifEnv *env = NULL;
|
||||||
|
|
||||||
enif_mutex_lock(async_nif->recycled_req_mutex);
|
/* Look for a request on our Lock-Free/RCU Queue first. */
|
||||||
if (STAILQ_EMPTY(&async_nif->recycled_reqs)) {
|
rcu_read_lock();
|
||||||
if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) {
|
node = cds_lfq_dequeue_rcu(&async_nif->recycled_req_queue);
|
||||||
|
rcu_read_unlock();
|
||||||
|
|
||||||
|
if (node) {
|
||||||
|
/* The goal is to reuse these req structs, not malloc/free them
|
||||||
|
repeatedly so we don't `call_rcu(&async_nif->rcu, free_req_cb);`.
|
||||||
|
We reuse this req, then when exiting we'll free all of them at
|
||||||
|
once. */
|
||||||
|
req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
|
||||||
|
} else {
|
||||||
|
/* The reuse queue is empty, create a new request. */
|
||||||
|
if (uatomic_read(&async_nif->num_reqs) < ASYNC_NIF_MAX_QUEUED_REQS) {
|
||||||
req = malloc(sizeof(struct async_nif_req_entry));
|
req = malloc(sizeof(struct async_nif_req_entry));
|
||||||
if (req) {
|
if (req) {
|
||||||
memset(req, 0, sizeof(struct async_nif_req_entry));
|
memset(req, 0, sizeof(struct async_nif_req_entry));
|
||||||
env = enif_alloc_env();
|
env = enif_alloc_env();
|
||||||
if (env) {
|
if (env) {
|
||||||
req->env = env;
|
req->env = env;
|
||||||
__sync_fetch_and_add(&async_nif->num_reqs, 1);
|
uatomic_inc(&async_nif->num_reqs);
|
||||||
} else {
|
} else {
|
||||||
free(req);
|
free(req);
|
||||||
req = NULL;
|
req = NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
req = STAILQ_FIRST(&async_nif->recycled_reqs);
|
|
||||||
STAILQ_REMOVE(&async_nif->recycled_reqs, req, async_nif_req_entry, entries);
|
|
||||||
}
|
}
|
||||||
enif_mutex_unlock(async_nif->recycled_req_mutex);
|
|
||||||
return req;
|
return req;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -226,14 +235,21 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
|
||||||
void
|
void
|
||||||
async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif)
|
async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif)
|
||||||
{
|
{
|
||||||
ErlNifEnv *env = NULL;
|
/* Three things to do here to prepare this request struct for reuse.
|
||||||
enif_mutex_lock(async_nif->recycled_req_mutex);
|
1) clear the NIF Environment
|
||||||
|
2) zero out the req struct except...
|
||||||
|
3) keep a pointer to the env so we can reset it in the req */
|
||||||
|
ErlNifEnv *env = req->env;
|
||||||
enif_clear_env(req->env);
|
enif_clear_env(req->env);
|
||||||
env = req->env;
|
if (req->args) free(req->args);
|
||||||
memset(req, 0, sizeof(struct async_nif_req_entry));
|
memset(req, 0, sizeof(struct async_nif_req_entry));
|
||||||
req->env = env;
|
req->env = env;
|
||||||
STAILQ_INSERT_TAIL(&async_nif->recycled_reqs, req, entries);
|
|
||||||
enif_mutex_unlock(async_nif->recycled_req_mutex);
|
/* Now enqueue this request on our Lock-Free/RCU Queue to be reused later. */
|
||||||
|
cds_lfq_node_init_rcu(&req->queue_entry);
|
||||||
|
rcu_read_lock();
|
||||||
|
cds_lfq_enqueue_rcu(&async_nif->recycled_req_queue, &req->queue_entry);
|
||||||
|
rcu_read_unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *async_nif_worker_fn(void *);
|
static void *async_nif_worker_fn(void *);
|
||||||
|
@ -244,41 +260,38 @@ static void *async_nif_worker_fn(void *);
|
||||||
static int
|
static int
|
||||||
async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_queue *q)
|
async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_queue *q)
|
||||||
{
|
{
|
||||||
struct async_nif_worker_entry *we;
|
struct async_nif_worker_entry *worker;
|
||||||
|
|
||||||
if (0 == q)
|
if (0 == q)
|
||||||
return EINVAL;
|
return EINVAL;
|
||||||
|
|
||||||
enif_mutex_lock(async_nif->we_mutex);
|
/* Before creating a new worker thread join threads which have exited. */
|
||||||
|
for(;;) {
|
||||||
|
struct cds_lfq_node_rcu *node;
|
||||||
|
|
||||||
we = SLIST_FIRST(&async_nif->we_joining);
|
rcu_read_lock();
|
||||||
while(we != NULL) {
|
node = cds_lfq_dequeue_rcu(&async_nif->worker_join_queue);
|
||||||
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
|
rcu_read_unlock();
|
||||||
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
|
|
||||||
|
if (!node) break; /* Queue is empty. */
|
||||||
|
|
||||||
|
worker = caa_container_of(node, struct async_nif_worker_entry, queue_entry);
|
||||||
void *exit_value = 0; /* We ignore the thread_join's exit value. */
|
void *exit_value = 0; /* We ignore the thread_join's exit value. */
|
||||||
enif_thread_join(we->tid, &exit_value);
|
enif_thread_join(worker->tid, &exit_value);
|
||||||
free(we);
|
free(worker);
|
||||||
async_nif->we_active--;
|
uatomic_dec(&async_nif->num_active_workers);
|
||||||
we = n;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) {
|
if (uatomic_read(&async_nif->num_active_workers) >= ASYNC_NIF_MAX_WORKERS)
|
||||||
enif_mutex_unlock(async_nif->we_mutex);
|
|
||||||
return EAGAIN;
|
return EAGAIN;
|
||||||
}
|
|
||||||
|
|
||||||
we = malloc(sizeof(struct async_nif_worker_entry));
|
worker = malloc(sizeof(struct async_nif_worker_entry));
|
||||||
if (!we) {
|
if (!worker) return ENOMEM;
|
||||||
enif_mutex_unlock(async_nif->we_mutex);
|
memset(worker, 0, sizeof(struct async_nif_worker_entry));
|
||||||
return ENOMEM;
|
worker->worker_id = uatomic_add_return(&async_nif->num_active_workers, 1);
|
||||||
}
|
worker->async_nif = async_nif;
|
||||||
memset(we, 0, sizeof(struct async_nif_worker_entry));
|
worker->q = q;
|
||||||
we->worker_id = async_nif->we_active++;
|
return enif_thread_create(NULL,&worker->tid, &async_nif_worker_fn, (void*)worker, 0);
|
||||||
we->async_nif = async_nif;
|
|
||||||
we->q = q;
|
|
||||||
|
|
||||||
enif_mutex_unlock(async_nif->we_mutex);
|
|
||||||
return enif_thread_create(NULL,&we->tid, &async_nif_worker_fn, (void*)we, 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -295,6 +308,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
|
||||||
struct async_nif_work_queue *q = NULL;
|
struct async_nif_work_queue *q = NULL;
|
||||||
double avg_depth = 0.0;
|
double avg_depth = 0.0;
|
||||||
|
|
||||||
|
//rcu_register_thread();
|
||||||
|
|
||||||
/* Either we're choosing a queue based on some affinity/hinted value or we
|
/* Either we're choosing a queue based on some affinity/hinted value or we
|
||||||
need to select the next queue in the rotation and atomically update that
|
need to select the next queue in the rotation and atomically update that
|
||||||
global value (next_q is shared across worker threads) . */
|
global value (next_q is shared across worker threads) . */
|
||||||
|
@ -312,59 +327,58 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
|
||||||
for (i = 0; i < async_nif->num_queues; i++) {
|
for (i = 0; i < async_nif->num_queues; i++) {
|
||||||
/* Compute the average queue depth not counting queues which are empty or
|
/* Compute the average queue depth not counting queues which are empty or
|
||||||
the queue we're considering right now. */
|
the queue we're considering right now. */
|
||||||
unsigned int j, n = 0;
|
unsigned int j, d, n = 0;
|
||||||
for (j = 0; j < async_nif->num_queues; j++) {
|
for (j = 0; j < async_nif->num_queues; j++) {
|
||||||
if (j != qid && async_nif->queues[j].depth != 0) {
|
d = uatomic_read(&async_nif->queues[j].depth);
|
||||||
|
if (j != qid && d != 0) {
|
||||||
n++;
|
n++;
|
||||||
avg_depth += async_nif->queues[j].depth;
|
avg_depth += d;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (avg_depth) avg_depth /= n;
|
if (avg_depth) avg_depth /= n;
|
||||||
|
|
||||||
/* Lock this queue under consideration, then check for shutdown. While
|
|
||||||
we hold this lock either a) we're shutting down so exit now or b) this
|
|
||||||
queue will be valid until we release the lock. */
|
|
||||||
q = &async_nif->queues[qid];
|
q = &async_nif->queues[qid];
|
||||||
enif_mutex_lock(q->reqs_mutex);
|
|
||||||
|
|
||||||
/* Try not to enqueue a request into a queue that isn't keeping up with
|
/* Try not to enqueue a request into a queue that isn't keeping up with
|
||||||
the request volume. */
|
the request volume. */
|
||||||
if (q->depth <= avg_depth) break;
|
if (uatomic_read(&q->depth) <= avg_depth) break;
|
||||||
else {
|
else qid = (qid + 1) % async_nif->num_queues;
|
||||||
enif_mutex_unlock(q->reqs_mutex);
|
|
||||||
qid = (qid + 1) % async_nif->num_queues;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* If the for loop finished then we didn't find a suitable queue for this
|
/* If the for loop finished then we didn't find a suitable queue for this
|
||||||
request, meaning we're backed up so trigger eagain. Note that if we left
|
request (e.g. we're backed up servicing requests) or the shutdown flag was
|
||||||
the loop in this way we hold no lock. */
|
set. Returning '0' will toss this request and free its resources.*/
|
||||||
if (i == async_nif->num_queues) return 0;
|
if (i == async_nif->num_queues || uatomic_read(&async_nif->shutdown))
|
||||||
|
return 0;
|
||||||
|
|
||||||
/* Add the request to the queue. */
|
/* Add the request to the queue. */
|
||||||
STAILQ_INSERT_TAIL(&q->reqs, req, entries);
|
cds_lfq_node_init_rcu(&req->queue_entry);
|
||||||
__sync_fetch_and_add(&q->depth, 1);
|
rcu_read_lock();
|
||||||
|
cds_lfq_enqueue_rcu(&q->req_queue, &req->queue_entry);
|
||||||
|
rcu_read_unlock();
|
||||||
|
URCU_TLS(nr_enqueues)++;
|
||||||
|
uatomic_inc(&q->depth);
|
||||||
|
uatomic_inc(&q->num_workers);
|
||||||
|
|
||||||
/* We've selected a queue for this new request now check to make sure there are
|
/* We've selected a queue for this new request now check to make sure there are
|
||||||
enough workers actively processing requests on this queue. */
|
enough workers actively processing requests on this queue. */
|
||||||
while (q->depth > q->num_workers) {
|
while (uatomic_read(&q->depth) > uatomic_read(&q->num_workers)) {
|
||||||
switch(async_nif_start_worker(async_nif, q)) {
|
switch(async_nif_start_worker(async_nif, q)) {
|
||||||
case EINVAL: case ENOMEM: default: return 0;
|
case EINVAL:
|
||||||
case EAGAIN: continue;
|
case ENOMEM:
|
||||||
case 0: __sync_fetch_and_add(&q->num_workers, 1); goto done;
|
return 0;
|
||||||
|
default:
|
||||||
|
case EAGAIN:
|
||||||
|
continue;
|
||||||
|
case 0:
|
||||||
|
uatomic_inc(&q->num_workers);
|
||||||
|
goto done;
|
||||||
}
|
}
|
||||||
}done:;
|
}done:;
|
||||||
|
|
||||||
/* Build the term before releasing the lock so as not to race on the use of
|
|
||||||
the req pointer (which will soon become invalid in another thread
|
|
||||||
performing the request). */
|
|
||||||
double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE;
|
double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE;
|
||||||
ERL_NIF_TERM reply = enif_make_tuple2(req->env, ATOM_OK,
|
return enif_make_tuple2(req->env, ATOM_OK,
|
||||||
enif_make_tuple2(req->env, ATOM_ENQUEUED,
|
enif_make_tuple2(req->env, ATOM_ENQUEUED,
|
||||||
enif_make_double(req->env, pct_full)));
|
enif_make_double(req->env, pct_full)));
|
||||||
enif_cond_signal(q->reqs_cnd);
|
|
||||||
enif_mutex_unlock(q->reqs_mutex);
|
|
||||||
return reply;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -375,73 +389,46 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
|
||||||
static void *
|
static void *
|
||||||
async_nif_worker_fn(void *arg)
|
async_nif_worker_fn(void *arg)
|
||||||
{
|
{
|
||||||
struct async_nif_worker_entry *we = (struct async_nif_worker_entry *)arg;
|
struct async_nif_worker_entry *worker = (struct async_nif_worker_entry *)arg;
|
||||||
unsigned int worker_id = we->worker_id;
|
unsigned int worker_id = worker->worker_id;
|
||||||
struct async_nif_state *async_nif = we->async_nif;
|
struct async_nif_state *async_nif = worker->async_nif;
|
||||||
struct async_nif_work_queue *q = we->q;
|
struct async_nif_work_queue *l = NULL, *q = worker->q;
|
||||||
|
|
||||||
|
// TODO(gburd): set_affinity(); to the CPU_ID for this queue
|
||||||
|
rcu_register_thread();
|
||||||
|
|
||||||
|
while(q != l) {
|
||||||
|
struct cds_lfq_node_rcu *node;
|
||||||
struct async_nif_req_entry *req = NULL;
|
struct async_nif_req_entry *req = NULL;
|
||||||
unsigned int tries = async_nif->num_queues;
|
|
||||||
|
|
||||||
for(;;) {
|
if (uatomic_read(&async_nif->shutdown))
|
||||||
/* Examine the request queue, are there things to be done? */
|
|
||||||
enif_mutex_lock(q->reqs_mutex);
|
|
||||||
check_again_for_work:
|
|
||||||
if (async_nif->shutdown) {
|
|
||||||
enif_mutex_unlock(q->reqs_mutex);
|
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
if (STAILQ_EMPTY(&q->reqs)) {
|
|
||||||
/* Queue is empty so we wait for more work to arrive. */
|
|
||||||
enif_mutex_unlock(q->reqs_mutex);
|
|
||||||
if (tries == 0 && q == we->q) {
|
|
||||||
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
|
|
||||||
/* At this point we've tried to find/execute work on all queues
|
|
||||||
* and there are at least MIN_WORKERS on this queue so we
|
|
||||||
* leaving this loop (break) which leads to a thread exit/join. */
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
enif_mutex_lock(q->reqs_mutex);
|
|
||||||
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
|
|
||||||
goto check_again_for_work;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
tries--;
|
|
||||||
__sync_fetch_and_add(&q->num_workers, -1);
|
|
||||||
q = q->next;
|
|
||||||
__sync_fetch_and_add(&q->num_workers, 1);
|
|
||||||
continue; // try next queue
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
/* At this point the next req is ours to process and we hold the
|
|
||||||
reqs_mutex lock. Take the request off the queue. */
|
|
||||||
req = STAILQ_FIRST(&q->reqs);
|
|
||||||
STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries);
|
|
||||||
__sync_fetch_and_add(&q->depth, -1);
|
|
||||||
|
|
||||||
/* Wake up other worker thread watching this queue to help process work. */
|
rcu_read_lock();
|
||||||
enif_cond_signal(q->reqs_cnd);
|
node = cds_lfq_dequeue_rcu(&q->req_queue);
|
||||||
enif_mutex_unlock(q->reqs_mutex);
|
rcu_read_unlock();
|
||||||
|
|
||||||
/* Perform the work. */
|
if (node) {
|
||||||
|
req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
|
||||||
|
uatomic_dec(&q->depth);
|
||||||
|
URCU_TLS(nr_dequeues)++;
|
||||||
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
|
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
|
||||||
|
|
||||||
/* Now call the post-work cleanup function. */
|
|
||||||
req->fn_post(req->args);
|
req->fn_post(req->args);
|
||||||
|
|
||||||
/* Clean up req for reuse. */
|
|
||||||
req->ref = 0;
|
|
||||||
req->fn_work = 0;
|
|
||||||
req->fn_post = 0;
|
|
||||||
free(req->args);
|
|
||||||
req->args = NULL;
|
|
||||||
async_nif_recycle_req(req, async_nif);
|
async_nif_recycle_req(req, async_nif);
|
||||||
req = NULL;
|
l = q;
|
||||||
|
} else {
|
||||||
|
/* This queue is empty, cycle through other queues looking for work. */
|
||||||
|
uatomic_dec(&q->num_workers);
|
||||||
|
q = q->next;
|
||||||
|
uatomic_inc(&q->num_workers);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
enif_mutex_lock(async_nif->we_mutex);
|
uatomic_dec(&q->num_workers);
|
||||||
SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries);
|
cds_lfq_node_init_rcu(&worker->queue_entry);
|
||||||
enif_mutex_unlock(async_nif->we_mutex);
|
rcu_read_lock();
|
||||||
__sync_fetch_and_add(&q->num_workers, -1);
|
cds_lfq_enqueue_rcu(&async_nif->worker_join_queue, &worker->queue_entry);
|
||||||
|
rcu_read_unlock();
|
||||||
|
rcu_unregister_thread();
|
||||||
enif_thread_exit(0);
|
enif_thread_exit(0);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -451,83 +438,74 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
|
||||||
{
|
{
|
||||||
unsigned int i;
|
unsigned int i;
|
||||||
unsigned int num_queues = async_nif->num_queues;
|
unsigned int num_queues = async_nif->num_queues;
|
||||||
|
struct cds_lfq_node_rcu *node;
|
||||||
struct async_nif_work_queue *q = NULL;
|
struct async_nif_work_queue *q = NULL;
|
||||||
struct async_nif_req_entry *req = NULL;
|
|
||||||
struct async_nif_worker_entry *we = NULL;
|
|
||||||
UNUSED(env);
|
UNUSED(env);
|
||||||
|
|
||||||
/* Signal the worker threads, stop what you're doing and exit. To ensure
|
/* Signal the worker threads, stop what you're doing and exit. */
|
||||||
that we don't race with the enqueue() process we first lock all the worker
|
uatomic_set(&async_nif->shutdown, 1);
|
||||||
queues, then set shutdown to true, then unlock. The enqueue function will
|
|
||||||
take the queue mutex, then test for shutdown condition, then enqueue only
|
|
||||||
if not shutting down. */
|
|
||||||
for (i = 0; i < num_queues; i++) {
|
|
||||||
q = &async_nif->queues[i];
|
|
||||||
enif_mutex_lock(q->reqs_mutex);
|
|
||||||
}
|
|
||||||
/* Set the shutdown flag so that worker threads will no continue
|
|
||||||
executing requests. */
|
|
||||||
async_nif->shutdown = 1;
|
|
||||||
for (i = 0; i < num_queues; i++) {
|
|
||||||
q = &async_nif->queues[i];
|
|
||||||
enif_mutex_unlock(q->reqs_mutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Join for the now exiting worker threads. */
|
/* Join for the now exiting worker threads. */
|
||||||
while(async_nif->we_active > 0) {
|
while(uatomic_read(&async_nif->num_active_workers) > 0) {
|
||||||
for (i = 0; i < num_queues; i++)
|
struct async_nif_worker_entry *worker;
|
||||||
enif_cond_broadcast(async_nif->queues[i].reqs_cnd);
|
struct cds_lfq_node_rcu *node;
|
||||||
enif_mutex_lock(async_nif->we_mutex);
|
|
||||||
we = SLIST_FIRST(&async_nif->we_joining);
|
rcu_read_lock();
|
||||||
while(we != NULL) {
|
node = cds_lfq_dequeue_rcu(&async_nif->worker_join_queue);
|
||||||
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
|
rcu_read_unlock();
|
||||||
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
|
|
||||||
|
if (node) {
|
||||||
|
worker = caa_container_of(node, struct async_nif_worker_entry, queue_entry);
|
||||||
void *exit_value = 0; /* We ignore the thread_join's exit value. */
|
void *exit_value = 0; /* We ignore the thread_join's exit value. */
|
||||||
enif_thread_join(we->tid, &exit_value);
|
enif_thread_join(worker->tid, &exit_value);
|
||||||
free(we);
|
free(worker);
|
||||||
async_nif->we_active--;
|
uatomic_dec(&async_nif->num_active_workers);
|
||||||
we = n;
|
|
||||||
}
|
}
|
||||||
enif_mutex_unlock(async_nif->we_mutex);
|
|
||||||
}
|
}
|
||||||
enif_mutex_destroy(async_nif->we_mutex);
|
cds_lfq_destroy_rcu(&async_nif->worker_join_queue); // TODO(gburd): check return val
|
||||||
|
|
||||||
/* Cleanup in-flight requests, mutexes and conditions in each work queue. */
|
/* Cleanup in-flight requests, mutexes and conditions in each work queue. */
|
||||||
for (i = 0; i < num_queues; i++) {
|
for (i = 0; i < num_queues; i++) {
|
||||||
q = &async_nif->queues[i];
|
q = &async_nif->queues[i];
|
||||||
|
|
||||||
/* Worker threads are stopped, now toss anything left in the queue. */
|
/* Worker threads are stopped, now toss anything left in the queue. */
|
||||||
req = NULL;
|
for (;;) {
|
||||||
req = STAILQ_FIRST(&q->reqs);
|
rcu_read_lock();
|
||||||
while(req != NULL) {
|
node = cds_lfq_dequeue_rcu(&q->req_queue);
|
||||||
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
|
rcu_read_unlock();
|
||||||
|
|
||||||
|
if (!node) break; /* Queue is empty. */
|
||||||
|
|
||||||
|
struct async_nif_req_entry *req;
|
||||||
|
req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
|
||||||
enif_clear_env(req->env);
|
enif_clear_env(req->env);
|
||||||
enif_send(NULL, &req->pid, req->env,
|
enif_send(NULL, &req->pid, req->env, enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN));
|
||||||
enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN));
|
|
||||||
req->fn_post(req->args);
|
req->fn_post(req->args);
|
||||||
enif_free_env(req->env);
|
|
||||||
free(req->args);
|
free(req->args);
|
||||||
|
enif_free_env(req->env);
|
||||||
free(req);
|
free(req);
|
||||||
req = n;
|
|
||||||
}
|
}
|
||||||
enif_mutex_destroy(q->reqs_mutex);
|
cds_lfq_destroy_rcu(&q->req_queue); // TODO(gburd): check return val
|
||||||
enif_cond_destroy(q->reqs_cnd);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Free any req structures sitting unused on the recycle queue. */
|
/* Free any req structures sitting unused on the recycle queue. */
|
||||||
enif_mutex_lock(async_nif->recycled_req_mutex);
|
for (;;) {
|
||||||
req = NULL;
|
rcu_read_lock();
|
||||||
req = STAILQ_FIRST(&async_nif->recycled_reqs);
|
node = cds_lfq_dequeue_rcu(&async_nif->recycled_req_queue);
|
||||||
while(req != NULL) {
|
rcu_read_unlock();
|
||||||
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
|
|
||||||
enif_free_env(req->env);
|
if (!node) break; /* Queue is empty. */
|
||||||
free(req);
|
|
||||||
req = n;
|
struct async_nif_req_entry *req;
|
||||||
}
|
req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
|
||||||
|
enif_free_env(req->env);
|
||||||
|
free(req->args);
|
||||||
|
free(req);
|
||||||
|
}
|
||||||
|
cds_lfq_destroy_rcu(&async_nif->recycled_req_queue); // TODO(gburd): check return val
|
||||||
|
|
||||||
enif_mutex_unlock(async_nif->recycled_req_mutex);
|
|
||||||
enif_mutex_destroy(async_nif->recycled_req_mutex);
|
|
||||||
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
|
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
|
||||||
|
free_all_cpu_call_rcu_data();
|
||||||
free(async_nif);
|
free(async_nif);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -551,6 +529,10 @@ async_nif_load(ErlNifEnv *env)
|
||||||
ATOM_OK = enif_make_atom(env, "ok");
|
ATOM_OK = enif_make_atom(env, "ok");
|
||||||
ATOM_SHUTDOWN = enif_make_atom(env, "shutdown");
|
ATOM_SHUTDOWN = enif_make_atom(env, "shutdown");
|
||||||
|
|
||||||
|
/* Init the RCU library. */
|
||||||
|
rcu_init();
|
||||||
|
(void)create_all_cpu_call_rcu_data(0);
|
||||||
|
|
||||||
/* Find out how many schedulers there are. */
|
/* Find out how many schedulers there are. */
|
||||||
enif_system_info(&info, sizeof(ErlNifSysInfo));
|
enif_system_info(&info, sizeof(ErlNifSysInfo));
|
||||||
|
|
||||||
|
@ -566,6 +548,7 @@ async_nif_load(ErlNifEnv *env)
|
||||||
if (num_queues < 2)
|
if (num_queues < 2)
|
||||||
num_queues = 2;
|
num_queues = 2;
|
||||||
}
|
}
|
||||||
|
num_queues = 1; //TODO remove me.
|
||||||
|
|
||||||
/* Init our portion of priv_data's module-specific state. */
|
/* Init our portion of priv_data's module-specific state. */
|
||||||
async_nif = malloc(sizeof(struct async_nif_state) +
|
async_nif = malloc(sizeof(struct async_nif_state) +
|
||||||
|
@ -576,19 +559,15 @@ async_nif_load(ErlNifEnv *env)
|
||||||
sizeof(struct async_nif_work_queue) * num_queues);
|
sizeof(struct async_nif_work_queue) * num_queues);
|
||||||
|
|
||||||
async_nif->num_queues = num_queues;
|
async_nif->num_queues = num_queues;
|
||||||
async_nif->we_active = 0;
|
async_nif->num_active_workers = 0;
|
||||||
async_nif->next_q = 0;
|
async_nif->next_q = 0;
|
||||||
async_nif->shutdown = 0;
|
async_nif->shutdown = 0;
|
||||||
STAILQ_INIT(&async_nif->recycled_reqs);
|
cds_lfq_init_rcu(&async_nif->recycled_req_queue, call_rcu);
|
||||||
async_nif->recycled_req_mutex = enif_mutex_create("recycled_req");
|
cds_lfq_init_rcu(&async_nif->worker_join_queue, call_rcu);
|
||||||
async_nif->we_mutex = enif_mutex_create("we");
|
|
||||||
SLIST_INIT(&async_nif->we_joining);
|
|
||||||
|
|
||||||
for (i = 0; i < async_nif->num_queues; i++) {
|
for (i = 0; i < async_nif->num_queues; i++) {
|
||||||
struct async_nif_work_queue *q = &async_nif->queues[i];
|
struct async_nif_work_queue *q = &async_nif->queues[i];
|
||||||
STAILQ_INIT(&q->reqs);
|
cds_lfq_init_rcu(&q->req_queue, call_rcu);
|
||||||
q->reqs_mutex = enif_mutex_create("reqs");
|
|
||||||
q->reqs_cnd = enif_cond_create("reqs");
|
|
||||||
q->next = &async_nif->queues[(i + 1) % num_queues];
|
q->next = &async_nif->queues[(i + 1) % num_queues];
|
||||||
}
|
}
|
||||||
return async_nif;
|
return async_nif;
|
||||||
|
|
|
@ -1,5 +1,8 @@
|
||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
|
|
||||||
|
#t=__.$$
|
||||||
|
#trap 'rm -f $t; exit 0' 0 1 2 3 13 15
|
||||||
|
|
||||||
# /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is.
|
# /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is.
|
||||||
if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then
|
if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then
|
||||||
POSIX_SHELL="true"
|
POSIX_SHELL="true"
|
||||||
|
@ -10,15 +13,22 @@ unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as w
|
||||||
|
|
||||||
set -e
|
set -e
|
||||||
|
|
||||||
|
# WiredTiger
|
||||||
WT_REPO=http://github.com/wiredtiger/wiredtiger.git
|
WT_REPO=http://github.com/wiredtiger/wiredtiger.git
|
||||||
WT_BRANCH=develop
|
#WT_BRANCH=develop
|
||||||
WT_DIR=wiredtiger-`basename $WT_BRANCH`
|
#WT_DIR=wiredtiger-`basename $WT_BRANCH`
|
||||||
#WT_REF="tags/1.6.6"
|
WT_REF="tags/1.6.4"
|
||||||
#WT_DIR=wiredtiger-`basename $WT_REF`
|
WT_DIR=wiredtiger-`basename $WT_REF`
|
||||||
|
|
||||||
|
# Google's Snappy Compression
|
||||||
SNAPPY_VSN="1.0.4"
|
SNAPPY_VSN="1.0.4"
|
||||||
SNAPPY_DIR=snappy-$SNAPPY_VSN
|
SNAPPY_DIR=snappy-$SNAPPY_VSN
|
||||||
|
|
||||||
|
# User-space Read-Copy-Update (RCU)
|
||||||
|
URCU_REPO=git://git.lttng.org/userspace-rcu.git
|
||||||
|
URCU_REF="tags/v0.7.7"
|
||||||
|
URCU_DIR=urcu-`basename $URCU_REF`
|
||||||
|
|
||||||
[ `basename $PWD` != "c_src" ] && cd c_src
|
[ `basename $PWD` != "c_src" ] && cd c_src
|
||||||
|
|
||||||
export BASEDIR="$PWD"
|
export BASEDIR="$PWD"
|
||||||
|
@ -26,10 +36,41 @@ export BASEDIR="$PWD"
|
||||||
which gmake 1>/dev/null 2>/dev/null && MAKE=gmake
|
which gmake 1>/dev/null 2>/dev/null && MAKE=gmake
|
||||||
MAKE=${MAKE:-make}
|
MAKE=${MAKE:-make}
|
||||||
|
|
||||||
export CPPFLAGS="$CPPLAGS -I $BASEDIR/system/include -O3 -mtune=native -march=native"
|
export CFLAGS="$CFLAGS -I $BASEDIR/system/include"
|
||||||
|
export CXXFLAGS="$CXXFLAGS -I $BASEDIR/system/include"
|
||||||
export LDFLAGS="$LDFLAGS -L$BASEDIR/system/lib"
|
export LDFLAGS="$LDFLAGS -L$BASEDIR/system/lib"
|
||||||
export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$BASEDIR/system/lib:$LD_LIBRARY_PATH"
|
export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$BASEDIR/system/lib:$LD_LIBRARY_PATH"
|
||||||
|
|
||||||
|
get_urcu ()
|
||||||
|
{
|
||||||
|
if [ -d $BASEDIR/$URCU_DIR/.git ]; then
|
||||||
|
(cd $BASEDIR/$URCU_DIR && git pull -u) || exit 1
|
||||||
|
else
|
||||||
|
if [ "X$URCU_REF" != "X" ]; then
|
||||||
|
git clone ${URCU_REPO} ${URCU_DIR} && \
|
||||||
|
(cd $BASEDIR/$URCU_DIR && git checkout refs/$URCU_REF || exit 1)
|
||||||
|
else
|
||||||
|
git clone ${URCU_REPO} ${URCU_DIR} && \
|
||||||
|
(cd $BASEDIR/$URCU_DIR && git checkout -b $URCU_BRANCH origin/$URCU_BRANCH || exit 1)
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
[ -d $BASEDIR/$URCU_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
|
||||||
|
(cd $BASEDIR/$URCU_DIR
|
||||||
|
[ -e $BASEDIR/urcu-build.patch ] && \
|
||||||
|
(patch -p1 --forward < $BASEDIR/urcu-build.patch || exit 1 )
|
||||||
|
autoreconf -fis || exit 1
|
||||||
|
urcu_configure;
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
urcu_configure ()
|
||||||
|
{
|
||||||
|
(cd $BASEDIR/$URCU_DIR
|
||||||
|
LDFLAGS+="-Wl,-rpath,lib/urcu-v0.7.7/priv:lib/urcu/priv:priv" \
|
||||||
|
CFLAGS+="-m64 -Os -g -march=native -mtune=native -fPIC" \
|
||||||
|
./configure --prefix=${BASEDIR}/system || exit 1)
|
||||||
|
}
|
||||||
|
|
||||||
get_wt ()
|
get_wt ()
|
||||||
{
|
{
|
||||||
if [ -d $BASEDIR/$WT_DIR/.git ]; then
|
if [ -d $BASEDIR/$WT_DIR/.git ]; then
|
||||||
|
@ -57,8 +98,9 @@ get_wt ()
|
||||||
wt_configure ()
|
wt_configure ()
|
||||||
{
|
{
|
||||||
(cd $BASEDIR/$WT_DIR/build_posix
|
(cd $BASEDIR/$WT_DIR/build_posix
|
||||||
CFLAGS+=-g $BASEDIR/$WT_DIR/configure --with-pic \
|
CFLAGS+=-g ../configure --with-pic \
|
||||||
--enable-snappy \
|
--enable-snappy \
|
||||||
|
--disable-python --disable-java \
|
||||||
--prefix=${BASEDIR}/system || exit 1)
|
--prefix=${BASEDIR}/system || exit 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,8 +117,9 @@ get_snappy ()
|
||||||
|
|
||||||
get_deps ()
|
get_deps ()
|
||||||
{
|
{
|
||||||
get_snappy;
|
|
||||||
get_wt;
|
get_wt;
|
||||||
|
get_snappy;
|
||||||
|
get_urcu;
|
||||||
}
|
}
|
||||||
|
|
||||||
update_deps ()
|
update_deps ()
|
||||||
|
@ -90,6 +133,22 @@ update_deps ()
|
||||||
fi
|
fi
|
||||||
)
|
)
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
if [ -d $BASEDIR/$URCU_DIR/.git ]; then
|
||||||
|
(cd $BASEDIR/$URCU_DIR
|
||||||
|
if [ "X$URCU_VSN" == "X" ]; then
|
||||||
|
git pull -u || exit 1
|
||||||
|
else
|
||||||
|
git checkout $URCU_VSN || exit 1
|
||||||
|
fi
|
||||||
|
)
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
|
build_urcu ()
|
||||||
|
{
|
||||||
|
urcu_configure;
|
||||||
|
(cd $BASEDIR/$URCU_DIR && $MAKE -j && $MAKE install)
|
||||||
}
|
}
|
||||||
|
|
||||||
build_wt ()
|
build_wt ()
|
||||||
|
@ -110,6 +169,7 @@ case "$1" in
|
||||||
clean)
|
clean)
|
||||||
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
|
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
|
||||||
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE clean)
|
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE clean)
|
||||||
|
[ -e $BASEDIR/$URCU_DIR/Makefile ] && (cd $BASEDIR/$URCU_DIR && $MAKE clean)
|
||||||
rm -rf system $SNAPPY_DIR
|
rm -rf system $SNAPPY_DIR
|
||||||
rm -f ${BASEDIR}/../priv/wt
|
rm -f ${BASEDIR}/../priv/wt
|
||||||
rm -f ${BASEDIR}/../priv/libwiredtiger-*.so
|
rm -f ${BASEDIR}/../priv/libwiredtiger-*.so
|
||||||
|
@ -130,23 +190,30 @@ case "$1" in
|
||||||
;;
|
;;
|
||||||
|
|
||||||
*)
|
*)
|
||||||
shopt -s extglob
|
|
||||||
SUFFIXES='@(so|dylib)'
|
|
||||||
|
|
||||||
# Build Snappy
|
# Build Snappy
|
||||||
[ -d $SNAPPY_DIR ] || get_snappy;
|
[ -d $SNAPPY_DIR ] || get_snappy;
|
||||||
[ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory" && exit 1)
|
[ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory" && exit 1)
|
||||||
test -f $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9].* || build_snappy;
|
test -f $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9] || build_snappy;
|
||||||
|
|
||||||
|
# Build URCU
|
||||||
|
[ -d $URCU_DIR ] || get_urcu;
|
||||||
|
[ -d $BASEDIR/$URCU_DIR ] || (echo "Missing URCU source directory" && exit 1)
|
||||||
|
test -f $BASEDIR/system/lib/liburcu.a || build_urcu;
|
||||||
|
|
||||||
# Build WiredTiger
|
# Build WiredTiger
|
||||||
[ -d $WT_DIR ] || get_wt;
|
[ -d $WT_DIR ] || get_wt;
|
||||||
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
|
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
|
||||||
test -f $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].${SUFFIXES} -a \
|
test -f $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so \
|
||||||
-f $BASEDIR/system/lib/libwiredtiger_snappy.${SUFFIXES} || build_wt;
|
-a -f $BASEDIR/system/lib/libwiredtiger_snappy.so || build_wt;
|
||||||
|
|
||||||
[ -d $BASEDIR/../priv ] || mkdir ${BASEDIR}/../priv
|
[ -d $BASEDIR/../priv ] || mkdir ${BASEDIR}/../priv
|
||||||
cp -p -P $BASEDIR/system/bin/wt ${BASEDIR}/../priv
|
cp -p -P $BASEDIR/system/bin/wt ${BASEDIR}/../priv
|
||||||
cp -p -P ${BASEDIR}/system/lib/libwiredtiger-[0-9].[0-9].[0-9].${SUFFIXES} ${BASEDIR}/../priv
|
cp -p -P $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so ${BASEDIR}/../priv
|
||||||
cp -p -P ${BASEDIR}/system/lib/libwiredtiger_snappy.${SUFFIXES} ${BASEDIR}/../priv
|
cp -p -P $BASEDIR/system/lib/libwiredtiger_snappy.so* ${BASEDIR}/../priv
|
||||||
cp -p -P ${BASEDIR}/system/lib/libsnappy.${SUFFIXES}* ${BASEDIR}/../priv
|
cp -p -P $BASEDIR/system/lib/libsnappy.so* ${BASEDIR}/../priv
|
||||||
|
cp -p -P $BASEDIR/system/lib/liburcu.so* ${BASEDIR}/../priv
|
||||||
|
cp -p -P $BASEDIR/system/lib/liburcu-*.so* ${BASEDIR}/../priv
|
||||||
;;
|
;;
|
||||||
esac
|
esac
|
||||||
|
|
||||||
|
exit 0
|
||||||
|
|
|
@ -42,8 +42,8 @@ extern "C" {
|
||||||
#define DPUTS(arg) ((void) 0)
|
#define DPUTS(arg) ((void) 0)
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifndef __UNUSED
|
#ifndef UNUSED
|
||||||
#define __UNUSED(v) ((void)(v))
|
#define UNUSED(v) ((void)(v))
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifndef COMPQUIET
|
#ifndef COMPQUIET
|
||||||
|
|
23
c_src/urcu-build.patch
Normal file
23
c_src/urcu-build.patch
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
diff --git a/urcu/rculfhash.h b/urcu/rculfhash.h
|
||||||
|
index 23bd1ce..98e00b4 100644
|
||||||
|
--- a/urcu/rculfhash.h
|
||||||
|
+++ b/urcu/rculfhash.h
|
||||||
|
@@ -31,6 +31,10 @@
|
||||||
|
#include <urcu-call-rcu.h>
|
||||||
|
#include <urcu-flavor.h>
|
||||||
|
|
||||||
|
+#ifndef UNUSED
|
||||||
|
+#define UNUSED(v) ((void)(v))
|
||||||
|
+#endif
|
||||||
|
+
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
@@ -92,6 +96,7 @@ typedef int (*cds_lfht_match_fct)(struct cds_lfht_node *node, const void *key);
|
||||||
|
static inline
|
||||||
|
void cds_lfht_node_init(struct cds_lfht_node *node)
|
||||||
|
{
|
||||||
|
+ UNUSED(node);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
|
@ -1,5 +1,5 @@
|
||||||
diff --git a/ext/compressors/snappy/Makefile.am b/ext/compressors/snappy/Makefile.am
|
diff --git a/ext/compressors/snappy/Makefile.am b/ext/compressors/snappy/Makefile.am
|
||||||
index 6d78823..c423590 100644
|
index 6d78823..2122cf8 100644
|
||||||
--- a/ext/compressors/snappy/Makefile.am
|
--- a/ext/compressors/snappy/Makefile.am
|
||||||
+++ b/ext/compressors/snappy/Makefile.am
|
+++ b/ext/compressors/snappy/Makefile.am
|
||||||
@@ -2,5 +2,6 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
|
@@ -2,5 +2,6 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
|
||||||
|
@ -10,3 +10,97 @@ index 6d78823..c423590 100644
|
||||||
+libwiredtiger_snappy_la_CFLAGS = -I$(abs_top_builddir)/../../system/include
|
+libwiredtiger_snappy_la_CFLAGS = -I$(abs_top_builddir)/../../system/include
|
||||||
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -L$(abs_top_builddir)/../../system/lib -Wl,-rpath,lib/wterl-0.9.0/priv:lib/wterl/priv:priv
|
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -L$(abs_top_builddir)/../../system/lib -Wl,-rpath,lib/wterl-0.9.0/priv:lib/wterl/priv:priv
|
||||||
libwiredtiger_snappy_la_LIBADD = -lsnappy
|
libwiredtiger_snappy_la_LIBADD = -lsnappy
|
||||||
|
diff --git a/src/support/cksum.c b/src/support/cksum.c
|
||||||
|
index 7e9befe..b924db7 100644
|
||||||
|
--- a/src/support/cksum.c
|
||||||
|
+++ b/src/support/cksum.c
|
||||||
|
@@ -27,6 +27,13 @@
|
||||||
|
|
||||||
|
#include "wt_internal.h"
|
||||||
|
|
||||||
|
+#if defined(__amd64) || defined(__x86_64)
|
||||||
|
+#define USE_HARDWARE_CRC32 1
|
||||||
|
+#else
|
||||||
|
+#undef USE_HARDWARE_CRC32
|
||||||
|
+#endif
|
||||||
|
+
|
||||||
|
+#ifdef USE_HARDWARE_CRC32
|
||||||
|
static const uint32_t g_crc_slicing[8][256] = {
|
||||||
|
#ifdef WORDS_BIGENDIAN
|
||||||
|
/*
|
||||||
|
@@ -1078,6 +1085,7 @@ static const uint32_t g_crc_slicing[8][256] = {
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
};
|
||||||
|
+#endif /* USE_HARDWARE_CRC32 */
|
||||||
|
|
||||||
|
/*
|
||||||
|
* __wt_cksum --
|
||||||
|
@@ -1106,15 +1114,29 @@ __wt_cksum(const void *chunk, size_t len)
|
||||||
|
/* Checksum one byte at a time to the first 4B boundary. */
|
||||||
|
for (p = chunk;
|
||||||
|
((uintptr_t)p & (sizeof(uint32_t) - 1)) != 0 &&
|
||||||
|
- len > 0; ++p, --len)
|
||||||
|
+ len > 0; ++p, --len) {
|
||||||
|
+#ifdef USE_HARDWARE_CRC32
|
||||||
|
+ __asm__ __volatile__(
|
||||||
|
+ ".byte 0xF2, 0x0F, 0x38, 0xF0, 0xF1"
|
||||||
|
+ : "=S" (crc)
|
||||||
|
+ : "0" (crc), "c" (*p));
|
||||||
|
+#else
|
||||||
|
#ifdef WORDS_BIGENDIAN
|
||||||
|
crc = g_crc_slicing[0][((crc >> 24) ^ *p) & 0xFF] ^ (crc << 8);
|
||||||
|
#else
|
||||||
|
crc = g_crc_slicing[0][(crc ^ *p) & 0xFF] ^ (crc >> 8);
|
||||||
|
#endif
|
||||||
|
+#endif
|
||||||
|
+ }
|
||||||
|
|
||||||
|
/* Checksum in 8B chunks. */
|
||||||
|
for (nqwords = len / sizeof(uint64_t); nqwords; nqwords--) {
|
||||||
|
+#ifdef USE_HARDWARE_CRC32
|
||||||
|
+ __asm__ __volatile__ (
|
||||||
|
+ ".byte 0xf2, 0x48, 0x0f, 0x38, 0xf0, 0xf1;"
|
||||||
|
+ : "=S"(crc)
|
||||||
|
+ : "S"(crc), "c"(*p));
|
||||||
|
+#else
|
||||||
|
crc ^= *(uint32_t *)p;
|
||||||
|
p += sizeof(uint32_t);
|
||||||
|
next = *(uint32_t *)p;
|
||||||
|
@@ -1139,22 +1161,32 @@ __wt_cksum(const void *chunk, size_t len)
|
||||||
|
g_crc_slicing[1][(next >> 16) & 0xFF] ^
|
||||||
|
g_crc_slicing[0][(next >> 24)];
|
||||||
|
#endif
|
||||||
|
+#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Checksum trailing bytes one byte at a time. */
|
||||||
|
+ for (len &= 0x7; len > 0; ++p, len--) {
|
||||||
|
+#ifdef USE_HARDWARE_CRC32
|
||||||
|
+ __asm__ __volatile__(
|
||||||
|
+ ".byte 0xF2, 0x0F, 0x38, 0xF0, 0xF1"
|
||||||
|
+ : "=S" (crc)
|
||||||
|
+ : "0" (crc), "c" (*p));
|
||||||
|
+#else
|
||||||
|
#ifdef WORDS_BIGENDIAN
|
||||||
|
- for (len &= 0x7; len > 0; ++p, len--)
|
||||||
|
crc = g_crc_slicing[0][((crc >> 24) ^ *p) & 0xFF] ^ (crc << 8);
|
||||||
|
+#else
|
||||||
|
+ crc = g_crc_slicing[0][(crc ^ *p) & 0xFF] ^ (crc >> 8);
|
||||||
|
+#endif
|
||||||
|
+#endif
|
||||||
|
+ }
|
||||||
|
|
||||||
|
+#ifdef WORDS_BIGENDIAN
|
||||||
|
/* Do final byte swap to produce a result identical to little endian */
|
||||||
|
crc =
|
||||||
|
((crc << 24) & 0xFF000000) |
|
||||||
|
((crc << 8) & 0x00FF0000) |
|
||||||
|
((crc >> 8) & 0x0000FF00) |
|
||||||
|
((crc >> 24) & 0x000000FF);
|
||||||
|
-#else
|
||||||
|
- for (len &= 0x7; len > 0; ++p, len--)
|
||||||
|
- crc = g_crc_slicing[0][(crc ^ *p) & 0xFF] ^ (crc >> 8);
|
||||||
|
#endif
|
||||||
|
return (~crc);
|
||||||
|
}
|
||||||
|
|
|
@ -440,7 +440,6 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Callback to handle error messages.
|
* Callback to handle error messages.
|
||||||
*
|
*
|
||||||
|
@ -455,15 +454,13 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
|
||||||
* operation or library failure.
|
* operation or library failure.
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
__wterl_error_handler(WT_EVENT_HANDLER *handler, WT_SESSION *session,
|
__wterl_error_handler(WT_EVENT_HANDLER *handler, int error, const char *message)
|
||||||
int error, const char *message)
|
|
||||||
{
|
{
|
||||||
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
|
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
|
||||||
ErlNifEnv *msg_env;
|
ErlNifEnv *msg_env;
|
||||||
ErlNifPid *to_pid;
|
ErlNifPid *to_pid;
|
||||||
int rc = 0;
|
int rc = 0;
|
||||||
|
|
||||||
UNUSED(session);
|
|
||||||
enif_mutex_lock(eh->error_mutex);
|
enif_mutex_lock(eh->error_mutex);
|
||||||
msg_env = eh->msg_env_error;
|
msg_env = eh->msg_env_error;
|
||||||
to_pid = &eh->to_pid;
|
to_pid = &eh->to_pid;
|
||||||
|
@ -495,14 +492,13 @@ __wterl_error_handler(WT_EVENT_HANDLER *handler, WT_SESSION *session,
|
||||||
* operation or library failure.
|
* operation or library failure.
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
__wterl_message_handler(WT_EVENT_HANDLER *handler, WT_SESSION *session, const char *message)
|
__wterl_message_handler(WT_EVENT_HANDLER *handler, const char *message)
|
||||||
{
|
{
|
||||||
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
|
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
|
||||||
ErlNifEnv *msg_env;
|
ErlNifEnv *msg_env;
|
||||||
ErlNifPid *to_pid;
|
ErlNifPid *to_pid;
|
||||||
int rc = 0;
|
int rc = 0;
|
||||||
|
|
||||||
UNUSED(session);
|
|
||||||
enif_mutex_lock(eh->message_mutex);
|
enif_mutex_lock(eh->message_mutex);
|
||||||
msg_env = eh->msg_env_message;
|
msg_env = eh->msg_env_message;
|
||||||
to_pid = &eh->to_pid;
|
to_pid = &eh->to_pid;
|
||||||
|
@ -533,14 +529,13 @@ __wterl_message_handler(WT_EVENT_HANDLER *handler, WT_SESSION *session, const ch
|
||||||
* operation or library failure.
|
* operation or library failure.
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
__wterl_progress_handler(WT_EVENT_HANDLER *handler, WT_SESSION *session, const char *operation, uint64_t counter)
|
__wterl_progress_handler(WT_EVENT_HANDLER *handler, const char *operation, uint64_t counter)
|
||||||
{
|
{
|
||||||
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
|
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
|
||||||
ErlNifEnv *msg_env;
|
ErlNifEnv *msg_env;
|
||||||
ErlNifPid *to_pid;
|
ErlNifPid *to_pid;
|
||||||
int rc = 0;
|
int rc = 0;
|
||||||
|
|
||||||
UNUSED(session);
|
|
||||||
enif_mutex_lock(eh->progress_mutex);
|
enif_mutex_lock(eh->progress_mutex);
|
||||||
msg_env = eh->msg_env_progress;
|
msg_env = eh->msg_env_progress;
|
||||||
to_pid = &eh->to_pid;
|
to_pid = &eh->to_pid;
|
||||||
|
@ -2308,7 +2303,7 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
|
||||||
|
|
||||||
char msg[1024];
|
char msg[1024];
|
||||||
snprintf(msg, 1024, "NIF on_load complete (wterl version: %s, wiredtiger version: %s)", priv->wterl_vsn, priv->wiredtiger_vsn);
|
snprintf(msg, 1024, "NIF on_load complete (wterl version: %s, wiredtiger version: %s)", priv->wterl_vsn, priv->wiredtiger_vsn);
|
||||||
__wterl_message_handler((WT_EVENT_HANDLER *)&priv->eh, NULL, msg);
|
__wterl_message_handler((WT_EVENT_HANDLER *)&priv->eh, msg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +0,0 @@
|
||||||
%%%% This is the WiredTiger section
|
|
||||||
|
|
||||||
%% @doc wiredtiger data_root
|
|
||||||
{mapping, "wiredtiger.data_root", "wterl.data_root", [
|
|
||||||
{default, "{{platform_data_dir}}/wiredtiger"}
|
|
||||||
]}.
|
|
|
@ -38,8 +38,8 @@
|
||||||
{port_specs, [{"priv/wterl.so", ["c_src/*.c"]}]}.
|
{port_specs, [{"priv/wterl.so", ["c_src/*.c"]}]}.
|
||||||
|
|
||||||
{port_env, [
|
{port_env, [
|
||||||
{"DRV_CFLAGS", "$DRV_CFLAGS -O3 -mtune=native -march=native -fPIC -Wall -Wextra -Werror -I c_src/system/include"},
|
{"DRV_CFLAGS", "$DRV_CFLAGS -fPIC -Wall -Wextra -Werror -fdebug-cpp -ftrack-macro-expansion=2 -I c_src/system/include"},
|
||||||
{"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:lib/wterl-0.9.0/priv:priv -Lc_src/system/lib -lwiredtiger"}
|
{"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:priv -Lc_src/system/lib -lurcu -lurcu-cds -lwiredtiger"}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{pre_hooks, [{compile, "c_src/build_deps.sh compile"}]}.
|
{pre_hooks, [{compile, "c_src/build_deps.sh compile"}]}.
|
||||||
|
|
|
@ -22,7 +22,6 @@
|
||||||
|
|
||||||
-module(riak_kv_wterl_backend).
|
-module(riak_kv_wterl_backend).
|
||||||
-behavior(temp_riak_kv_backend).
|
-behavior(temp_riak_kv_backend).
|
||||||
-compile([{parse_transform, lager_transform}]).
|
|
||||||
|
|
||||||
%% KV Backend API
|
%% KV Backend API
|
||||||
-export([api_version/0,
|
-export([api_version/0,
|
||||||
|
@ -43,7 +42,7 @@
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-compile(export_all).
|
-compiel(export_all).
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
-define(API_VERSION, 1).
|
-define(API_VERSION, 1).
|
||||||
|
@ -120,14 +119,14 @@ start(Partition, Config) ->
|
||||||
"lsm" ->
|
"lsm" ->
|
||||||
[{internal_page_max, "128K"},
|
[{internal_page_max, "128K"},
|
||||||
{leaf_page_max, "16K"},
|
{leaf_page_max, "16K"},
|
||||||
{lsm, [
|
{lsm_chunk_size, "100MB"},
|
||||||
{bloom_config, [{leaf_page_max, "8MB"}]},
|
{lsm_merge_threads, 2},
|
||||||
{bloom_bit_count, 28},
|
{prefix_compression, true},
|
||||||
{bloom_hash_count, 19},
|
{lsm_bloom_newest, true},
|
||||||
{bloom_oldest, true},
|
{lsm_bloom_oldest, true} ,
|
||||||
{chunk_size, "100MB"},
|
{lsm_bloom_bit_count, 28},
|
||||||
{merge_threads, 2}
|
{lsm_bloom_hash_count, 19},
|
||||||
]}
|
{lsm_bloom_config, [{leaf_page_max, "8MB"}]}
|
||||||
] ++ Compressor;
|
] ++ Compressor;
|
||||||
"table" ->
|
"table" ->
|
||||||
Compressor
|
Compressor
|
||||||
|
@ -342,23 +341,22 @@ is_empty(#state{connection=Connection, table=Table}) ->
|
||||||
%% @doc Get the status information for this wterl backend
|
%% @doc Get the status information for this wterl backend
|
||||||
-spec status(state()) -> [{atom(), term()}].
|
-spec status(state()) -> [{atom(), term()}].
|
||||||
status(#state{connection=Connection, table=Table}) ->
|
status(#state{connection=Connection, table=Table}) ->
|
||||||
[].
|
case wterl:cursor_open(Connection, Table) of
|
||||||
%% case wterl:cursor_open(Connection, "statistics:" ++ Table, [{statistics_fast, true}]) of
|
{ok, Cursor} ->
|
||||||
%% {ok, Cursor} ->
|
TheStats =
|
||||||
%% TheStats =
|
case fetch_status(Cursor) of
|
||||||
%% case fetch_status(Cursor) of
|
{ok, Stats} ->
|
||||||
%% {ok, Stats} ->
|
Stats;
|
||||||
%% Stats;
|
{error, {eperm, _}} -> % TODO: review/fix this logic
|
||||||
%% {error, {eperm, _}} -> % TODO: review/fix this logic
|
{ok, []};
|
||||||
%% {ok, []};
|
_ ->
|
||||||
%% _ ->
|
{ok, []}
|
||||||
%% {ok, []}
|
end,
|
||||||
%% end,
|
wterl:cursor_close(Cursor),
|
||||||
%% wterl:cursor_close(Cursor),
|
TheStats;
|
||||||
%% TheStats;
|
{error, Reason2} ->
|
||||||
%% {error, Reason2} ->
|
{error, Reason2}
|
||||||
%% {error, Reason2}
|
end.
|
||||||
%% end.
|
|
||||||
|
|
||||||
%% @doc Register an asynchronous callback
|
%% @doc Register an asynchronous callback
|
||||||
-spec callback(reference(), any(), state()) -> {ok, state()}.
|
-spec callback(reference(), any(), state()) -> {ok, state()}.
|
||||||
|
@ -401,41 +399,30 @@ establish_connection(Config, Type) ->
|
||||||
ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
|
ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
|
||||||
|
|
||||||
%% WT Connection Options:
|
%% WT Connection Options:
|
||||||
LogSetting = app_helper:get_prop_or_env(log, Config, wterl, false),
|
%% NOTE: LSM auto-checkpoints, so we don't have too.
|
||||||
CheckpointSetting =
|
CheckpointSetting =
|
||||||
case Type =:= "lsm" of
|
case Type =:= "lsm" of
|
||||||
true ->
|
true ->
|
||||||
case LogSetting of
|
[];
|
||||||
true ->
|
|
||||||
%% Turn checkpoints on if logging is on, checkpoints enable log archival.
|
|
||||||
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 30}]); % in seconds
|
|
||||||
_ ->
|
|
||||||
[]
|
|
||||||
end;
|
|
||||||
false ->
|
false ->
|
||||||
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 30}])
|
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 10}])
|
||||||
end,
|
end,
|
||||||
RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl),
|
RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl),
|
||||||
ConnectionOpts =
|
ConnectionOpts =
|
||||||
orddict:from_list(
|
orddict:from_list(
|
||||||
[ wterl:config_value(create, Config, true),
|
[ wterl:config_value(create, Config, true),
|
||||||
wterl:config_value(checkpoint_sync, Config, false),
|
wterl:config_value(sync, Config, false),
|
||||||
wterl:config_value(transaction_sync, Config, "none"),
|
|
||||||
wterl:config_value(log, Config, [{enabled, LogSetting}]),
|
|
||||||
wterl:config_value(mmap, Config, false),
|
|
||||||
wterl:config_value(checkpoint, Config, CheckpointSetting),
|
|
||||||
wterl:config_value(session_max, Config, max_sessions(Config)),
|
wterl:config_value(session_max, Config, max_sessions(Config)),
|
||||||
wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)),
|
wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)),
|
||||||
wterl:config_value(statistics, Config, [ "fast", "clear"]),
|
|
||||||
wterl:config_value(statistics_log, Config, [{wait, 600}]), % in seconds
|
wterl:config_value(statistics_log, Config, [{wait, 600}]), % in seconds
|
||||||
wterl:config_value(verbose, Config, [ "salvage", "verify"
|
wterl:config_value(verbose, Config, [ "salvage", "verify"
|
||||||
% Note: for some unknown reason, if you add these additional
|
% Note: for some unknown reason, if you add these additional
|
||||||
% verbose flags Erlang SEGV's "size_object: bad tag for 0x80"
|
% verbose flags Erlang SEGV's "size_object: bad tag for 0x80"
|
||||||
% no idea why... you've been warned.
|
% no idea why... yet... you've been warned.
|
||||||
%"block", "shared_cache", "reconcile", "evict", "lsm",
|
%"block", "shared_cache", "reconcile", "evict", "lsm",
|
||||||
%"fileops", "read", "write", "readserver", "evictserver",
|
%"fileops", "read", "write", "readserver", "evictserver",
|
||||||
%"hazard", "mutex", "ckpt"
|
%"hazard", "mutex", "ckpt"
|
||||||
]) ] ++ proplists:get_value(wterl, Config, [])), % sec
|
]) ] ++ CheckpointSetting ++ proplists:get_value(wterl, Config, [])), % sec
|
||||||
|
|
||||||
%% WT Session Options:
|
%% WT Session Options:
|
||||||
SessionOpts = [{isolation, "snapshot"}],
|
SessionOpts = [{isolation, "snapshot"}],
|
||||||
|
@ -556,15 +543,15 @@ from_index_key(LKey) ->
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% Return all status from wterl statistics cursor
|
%% Return all status from wterl statistics cursor
|
||||||
%% fetch_status(Cursor) ->
|
fetch_status(Cursor) ->
|
||||||
%% {ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}.
|
{ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}.
|
||||||
%% fetch_status(_Cursor, {error, _}, Acc) ->
|
fetch_status(_Cursor, {error, _}, Acc) ->
|
||||||
%% lists:reverse(Acc);
|
lists:reverse(Acc);
|
||||||
%% fetch_status(_Cursor, not_found, Acc) ->
|
fetch_status(_Cursor, not_found, Acc) ->
|
||||||
%% lists:reverse(Acc);
|
lists:reverse(Acc);
|
||||||
%% fetch_status(Cursor, {ok, Stat}, Acc) ->
|
fetch_status(Cursor, {ok, Stat}, Acc) ->
|
||||||
%% [What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])],
|
[What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])],
|
||||||
%% fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]).
|
fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]).
|
||||||
|
|
||||||
size_cache(RequestedSize) ->
|
size_cache(RequestedSize) ->
|
||||||
Size =
|
Size =
|
||||||
|
|
|
@ -96,8 +96,8 @@ nif_stub_error(Line) ->
|
||||||
-spec init() -> ok | {error, any()}.
|
-spec init() -> ok | {error, any()}.
|
||||||
init() ->
|
init() ->
|
||||||
erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]),
|
erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]),
|
||||||
[{wterl_vsn, "942e51b"},
|
[{wterl_vsn, "53307e8"},
|
||||||
{wiredtiger_vsn, "1.6.4-275-g9c44420"}]). %% TODO automate these
|
{wiredtiger_vsn, "1.6.2-0-g07cb0a5"}]).
|
||||||
|
|
||||||
-spec connection_open(string(), config_list()) -> {ok, connection()} | {error, term()}.
|
-spec connection_open(string(), config_list()) -> {ok, connection()} | {error, term()}.
|
||||||
-spec connection_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}.
|
-spec connection_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}.
|
||||||
|
@ -454,26 +454,17 @@ config_to_bin([], Acc) ->
|
||||||
config_to_bin([{Key, Value} | Rest], Acc) ->
|
config_to_bin([{Key, Value} | Rest], Acc) ->
|
||||||
ConfigTypes =
|
ConfigTypes =
|
||||||
[{block_compressor, {string, quoted}},
|
[{block_compressor, {string, quoted}},
|
||||||
{bloom_bit_count, integer},
|
|
||||||
{bloom_config, config},
|
|
||||||
{bloom_hash_count, integer},
|
|
||||||
{bloom_newest, bool},
|
|
||||||
{bloom_oldest, bool},
|
|
||||||
{cache_size, string},
|
{cache_size, string},
|
||||||
{checkpoint, config},
|
{checkpoint, config},
|
||||||
{checkpoint_sync, bool},
|
|
||||||
{checksum, string},
|
{checksum, string},
|
||||||
{chunk_size, string},
|
|
||||||
{create, bool},
|
{create, bool},
|
||||||
{direct_io, list},
|
{direct_io, list},
|
||||||
{drop, list},
|
{drop, list},
|
||||||
{enabled, bool},
|
|
||||||
{error_prefix, string},
|
{error_prefix, string},
|
||||||
{eviction_target, integer},
|
{eviction_target, integer},
|
||||||
{eviction_trigger, integer},
|
{eviction_trigger, integer},
|
||||||
{extensions, {list, quoted}},
|
{extensions, {list, quoted}},
|
||||||
{statistics_fast, bool},
|
{statistics_fast, bool},
|
||||||
{file_max, string},
|
|
||||||
{force, bool},
|
{force, bool},
|
||||||
{from, string},
|
{from, string},
|
||||||
{hazard_max, integer},
|
{hazard_max, integer},
|
||||||
|
@ -483,21 +474,24 @@ config_to_bin([{Key, Value} | Rest], Acc) ->
|
||||||
{isolation, string},
|
{isolation, string},
|
||||||
{key_type, string},
|
{key_type, string},
|
||||||
{leaf_page_max, string},
|
{leaf_page_max, string},
|
||||||
{log, config},
|
{logging, bool},
|
||||||
{lsm, config},
|
{lsm_bloom_bit_count, integer},
|
||||||
{mmap, bool},
|
{lsm_bloom_config, config},
|
||||||
{merge_threads, integer},
|
{lsm_bloom_hash_count, integer},
|
||||||
|
{lsm_bloom_newest, bool},
|
||||||
|
{lsm_bloom_oldest, bool},
|
||||||
|
{lsm_chunk_size, string},
|
||||||
|
{prefix_compression, bool},
|
||||||
|
{lsm_merge_threads, integer},
|
||||||
{multiprocess, bool},
|
{multiprocess, bool},
|
||||||
{name, string},
|
{name, string},
|
||||||
{overwrite, bool},
|
{overwrite, bool},
|
||||||
{prefix_compression, bool},
|
|
||||||
{raw, bool},
|
{raw, bool},
|
||||||
{session_max, integer},
|
{session_max, integer},
|
||||||
{statistics, list},
|
|
||||||
{statistics_log, config},
|
{statistics_log, config},
|
||||||
|
{sync, bool},
|
||||||
{target, {list, quoted}},
|
{target, {list, quoted}},
|
||||||
{to, string},
|
{to, string},
|
||||||
{transaction_sync, string},
|
|
||||||
{transactional, bool},
|
{transactional, bool},
|
||||||
{verbose, list},
|
{verbose, list},
|
||||||
{wait, integer}],
|
{wait, integer}],
|
||||||
|
|
|
@ -25,7 +25,7 @@
|
||||||
|
|
||||||
{mode, max}.
|
{mode, max}.
|
||||||
{duration, 10}.
|
{duration, 10}.
|
||||||
{concurrent, 16}.
|
{concurrent, 4}.
|
||||||
{report_interval, 1}.
|
{report_interval, 1}.
|
||||||
{pb_timeout_general, 1000}. % ms
|
{pb_timeout_general, 1000}. % ms
|
||||||
%{pb_timeout_read, ?}.
|
%{pb_timeout_read, ?}.
|
||||||
|
@ -43,9 +43,7 @@
|
||||||
{wterl, [
|
{wterl, [
|
||||||
{connection, [
|
{connection, [
|
||||||
{create, true},
|
{create, true},
|
||||||
{session_sync, false},
|
{sync, false},
|
||||||
{transaction_sync, "none"},
|
|
||||||
{log, [{enabled, false}]},
|
|
||||||
{session_max, 1024},
|
{session_max, 1024},
|
||||||
{cache_size, 4294967296},
|
{cache_size, 4294967296},
|
||||||
{verbose, []},
|
{verbose, []},
|
||||||
|
@ -60,11 +58,11 @@
|
||||||
]},
|
]},
|
||||||
{session, [ {isolation, "snapshot"} ]},
|
{session, [ {isolation, "snapshot"} ]},
|
||||||
{table_uri, "lsm:test"},
|
{table_uri, "lsm:test"},
|
||||||
{lsm_merge_threads, 2},
|
|
||||||
{table, [
|
{table, [
|
||||||
{internal_page_max, "128K"},
|
{internal_page_max, "128K"},
|
||||||
{leaf_page_max, "128K"},
|
{leaf_page_max, "128K"},
|
||||||
{lsm_chunk_size, "25MB"},
|
{lsm_chunk_size, "25MB"},
|
||||||
|
{prefix_compression, false},
|
||||||
{lsm_bloom_newest, true},
|
{lsm_bloom_newest, true},
|
||||||
{lsm_bloom_oldest, true} ,
|
{lsm_bloom_oldest, true} ,
|
||||||
{lsm_bloom_bit_count, 128},
|
{lsm_bloom_bit_count, 128},
|
||||||
|
@ -78,9 +76,7 @@
|
||||||
{wterl_, [
|
{wterl_, [
|
||||||
{connection, [
|
{connection, [
|
||||||
{create, true},
|
{create, true},
|
||||||
{session_sync, false},
|
{sync, false},
|
||||||
{transaction_sync, "none"},
|
|
||||||
{log, [{enabled, false}]},
|
|
||||||
{session_max, 1024},
|
{session_max, 1024},
|
||||||
{cache_size, 4294967296},
|
{cache_size, 4294967296},
|
||||||
{verbose, []},
|
{verbose, []},
|
||||||
|
@ -97,6 +93,7 @@
|
||||||
{session, [ {isolation, "snapshot"} ]},
|
{session, [ {isolation, "snapshot"} ]},
|
||||||
{table_uri, "table:test"},
|
{table_uri, "table:test"},
|
||||||
{table, [
|
{table, [
|
||||||
|
{prefix_compression, false},
|
||||||
{block_compressor, "snappy"} % bzip2
|
{block_compressor, "snappy"} % bzip2
|
||||||
]}
|
]}
|
||||||
]}.
|
]}.
|
||||||
|
|
Loading…
Reference in a new issue