Compare commits

..

20 commits

Author SHA1 Message Date
Gregory Burd
cc807a97d6 Add new mmap config option. 2014-01-15 10:45:04 -05:00
Gregory Burd
ea99493ea3 Compenstate for LSM config API changes 2013-12-09 12:54:58 -05:00
Gregory Burd
08b2d18463 Fix checkpoint config. 2013-11-20 13:10:28 -05:00
Gregory Burd
3302ab26ed Use the develop branch for now. 2013-11-19 14:16:26 -05:00
Gregory Burd
db2daf99b2 Default logging off. 2013-11-19 14:16:01 -05:00
Gregory Burd
68d9ed942b Update to WiredTiger 1.6.6 2013-11-18 20:54:59 -05:00
Gregory Burd
36faa4e713 Forgot to remove second use of checkpoint setting. 2013-11-18 20:50:26 -05:00
Gregory Burd
e560185420 When logging enable checkpoints, even when using LSM. 2013-11-18 20:46:22 -05:00
Gregory Burd
448c0b555c Update config to match latest available options. 2013-10-30 15:00:41 -04:00
Gregory Burd
634bcd188a Integrate new configuration options available in WiredTiger. 2013-10-30 14:50:14 -04:00
Gregory Burd
1664fdcf8c API for handlers in WiredTiger changed to include session state, update our use of the API to match that change. 2013-10-30 13:11:14 -04:00
Gregory Burd
95515f111c Merge pull request #11 from basho-labs/gsb-2.0-fixes
Changes related to Riak 2.0 and an issue with how statistics were gathered from the backend
2013-10-30 08:53:49 -07:00
Gregory Burd
ac2c5caeff Change a few default configs and comment out the stats gathering for now. 2013-10-30 11:50:20 -04:00
Gregory Burd
75305dae94 Minor updates. 2013-10-12 21:48:05 -04:00
Gregory Burd
7d0ad2dce1 Update the version strings and a few config values which changed names. 2013-10-02 14:42:26 -04:00
Gregory Burd
84a85bbe38 Open a *statistics* cursor when gathering statistics. 2013-10-02 14:41:36 -04:00
Gregory Burd
9d2896016b A few build automation changes/fixes. 2013-10-02 14:41:06 -04:00
Gregory Burd
17585a99b1 priv now has the schema file in it, so be more specific with what we ignore in that dir 2013-10-02 14:38:41 -04:00
Gregory Burd
942e51b753 OS/X uses ".dylib" rather than ".so" for shared libraries (because it's
special) so I've worked around that.  Also tightened up some tests so that
we're not rebuilding the libraries when not necessary.
2013-09-06 09:54:55 -04:00
Gregory Burd
c60fa22422 Retry three times, then bail out and return not found. 2013-09-04 13:12:37 -04:00
13 changed files with 342 additions and 472 deletions

4
.gitignore vendored
View file

@ -7,6 +7,8 @@ c_src/*.o
c_src/bzip2-1.0.6
c_src/snappy-1.0.4
deps/
priv/
priv/wt
priv/*.so*
priv/*.dylib*
log/
*~

View file

@ -52,19 +52,17 @@ endif
.PHONY: all compile doc clean test dialyzer typer shell distclean pdf \
update-deps clean-common-test-data rebuild
all: deps compile test
all: deps compile
# =============================================================================
# Rules to build the system
# =============================================================================
deps:
c_src/build_deps.sh get-deps
$(REBAR) get-deps
$(REBAR) compile
update-deps:
c_src/build_deps.sh update-deps
$(REBAR) update-deps
$(REBAR) compile

View file

@ -24,11 +24,9 @@
extern "C" {
#endif
#include <stdlib.h>
#include <urcu.h> /* RCU flavor */
#include <urcu/arch.h> /* RCU Architecture */
#include <urcu/tls-compat.h> /* RCU Thread-local storage */
#include <urcu/rculfqueue.h> /* RCU Lock-free queue */
#include <assert.h>
#include "queue.h"
#ifndef UNUSED
#define UNUSED(v) ((void)(v))
@ -39,9 +37,6 @@ extern "C" {
#define ASYNC_NIF_WORKER_QUEUE_SIZE 8192
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
static DEFINE_URCU_TLS(unsigned long long, nr_enqueues);
static DEFINE_URCU_TLS(unsigned long long, nr_dequeues);
/* Atoms (initialized in on_load) */
static ERL_NIF_TERM ATOM_EAGAIN;
static ERL_NIF_TERM ATOM_ENOMEM;
@ -50,6 +45,7 @@ static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_SHUTDOWN;
struct async_nif_req_entry {
ERL_NIF_TERM ref;
ErlNifEnv *env;
@ -57,15 +53,17 @@ struct async_nif_req_entry {
void *args;
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
void (*fn_post)(void *);
struct cds_lfq_node_rcu queue_entry;
struct rcu_head rcu_head;
STAILQ_ENTRY(async_nif_req_entry) entries;
};
struct async_nif_work_queue {
unsigned int num_workers;
unsigned int depth;
struct cds_lfq_queue_rcu req_queue;
ErlNifMutex *reqs_mutex;
ErlNifCond *reqs_cnd;
struct async_nif_work_queue *next;
STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
};
struct async_nif_worker_entry {
@ -73,39 +71,40 @@ struct async_nif_worker_entry {
unsigned int worker_id;
struct async_nif_state *async_nif;
struct async_nif_work_queue *q;
struct cds_lfq_node_rcu queue_entry;
struct rcu_head rcu_head;
SLIST_ENTRY(async_nif_worker_entry) entries;
};
struct async_nif_state {
unsigned int shutdown;
unsigned int num_active_workers;
struct cds_lfq_queue_rcu worker_join_queue;
ErlNifMutex *we_mutex;
unsigned int we_active;
SLIST_HEAD(joining, async_nif_worker_entry) we_joining;
unsigned int num_queues;
unsigned int next_q;
struct cds_lfq_queue_rcu recycled_req_queue;
STAILQ_HEAD(recycled_reqs, async_nif_req_entry) recycled_reqs;
unsigned int num_reqs;
ErlNifMutex *recycled_req_mutex;
struct async_nif_work_queue queues[];
};
#define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \
struct decl##_args frame; \
static void fn_work_##decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl##_args *args) { \
struct decl ## _args frame; \
static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) { \
UNUSED(worker_id); \
DPRINTF("async_nif: calling \"%s\"", __func__); \
do work_block while(0); \
DPRINTF("async_nif: returned from \"%s\"", __func__); \
} \
static void fn_post_##decl (struct decl##_args *args) { \
static void fn_post_ ## decl (struct decl ## _args *args) { \
UNUSED(args); \
DPRINTF("async_nif: calling \"fn_post_%s\"", #decl); \
do post_block while(0); \
DPRINTF("async_nif: returned from \"fn_post_%s\"", #decl); \
} \
static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \
struct decl##_args on_stack_args; \
struct decl##_args *args = &on_stack_args; \
struct decl##_args *copy_of_args; \
struct decl ## _args on_stack_args; \
struct decl ## _args *args = &on_stack_args; \
struct decl ## _args *copy_of_args; \
struct async_nif_req_entry *req = NULL; \
unsigned int affinity = 0; \
ErlNifEnv *new_env = NULL; \
@ -125,22 +124,22 @@ struct async_nif_state {
DPRINTF("async_nif: returned from \"%s\"", __func__); \
copy_of_args = (struct decl ## _args *)malloc(sizeof(struct decl ## _args)); \
if (!copy_of_args) { \
fn_post_##decl (args); \
fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \
} \
memcpy(copy_of_args, args, sizeof(struct decl##_args)); \
memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \
req->ref = enif_make_copy(new_env, argv_in[0]); \
enif_self(env, &req->pid); \
req->args = (void*)copy_of_args; \
req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_##decl ; \
req->fn_post = (void (*)(void *))fn_post_##decl; \
req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \
req->fn_post = (void (*)(void *))fn_post_ ## decl; \
int h = -1; \
if (affinity) \
h = ((unsigned int)affinity) % async_nif->num_queues; \
ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \
if (!reply) { \
fn_post_##decl (args); \
if (!reply) { \
fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \
free(copy_of_args); \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_EAGAIN); \
@ -190,38 +189,30 @@ struct async_nif_state {
struct async_nif_req_entry *
async_nif_reuse_req(struct async_nif_state *async_nif)
{
struct cds_lfq_node_rcu *node;
struct async_nif_req_entry *req = NULL;
ErlNifEnv *env = NULL;
/* Look for a request on our Lock-Free/RCU Queue first. */
rcu_read_lock();
node = cds_lfq_dequeue_rcu(&async_nif->recycled_req_queue);
rcu_read_unlock();
if (node) {
/* The goal is to reuse these req structs, not malloc/free them
repeatedly so we don't `call_rcu(&async_nif->rcu, free_req_cb);`.
We reuse this req, then when exiting we'll free all of them at
once. */
req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
} else {
/* The reuse queue is empty, create a new request. */
if (uatomic_read(&async_nif->num_reqs) < ASYNC_NIF_MAX_QUEUED_REQS) {
enif_mutex_lock(async_nif->recycled_req_mutex);
if (STAILQ_EMPTY(&async_nif->recycled_reqs)) {
if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) {
req = malloc(sizeof(struct async_nif_req_entry));
if (req) {
memset(req, 0, sizeof(struct async_nif_req_entry));
env = enif_alloc_env();
if (env) {
req->env = env;
uatomic_inc(&async_nif->num_reqs);
__sync_fetch_and_add(&async_nif->num_reqs, 1);
} else {
free(req);
req = NULL;
}
}
}
} else {
req = STAILQ_FIRST(&async_nif->recycled_reqs);
STAILQ_REMOVE(&async_nif->recycled_reqs, req, async_nif_req_entry, entries);
}
enif_mutex_unlock(async_nif->recycled_req_mutex);
return req;
}
@ -235,21 +226,14 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
void
async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif)
{
/* Three things to do here to prepare this request struct for reuse.
1) clear the NIF Environment
2) zero out the req struct except...
3) keep a pointer to the env so we can reset it in the req */
ErlNifEnv *env = req->env;
ErlNifEnv *env = NULL;
enif_mutex_lock(async_nif->recycled_req_mutex);
enif_clear_env(req->env);
if (req->args) free(req->args);
env = req->env;
memset(req, 0, sizeof(struct async_nif_req_entry));
req->env = env;
/* Now enqueue this request on our Lock-Free/RCU Queue to be reused later. */
cds_lfq_node_init_rcu(&req->queue_entry);
rcu_read_lock();
cds_lfq_enqueue_rcu(&async_nif->recycled_req_queue, &req->queue_entry);
rcu_read_unlock();
STAILQ_INSERT_TAIL(&async_nif->recycled_reqs, req, entries);
enif_mutex_unlock(async_nif->recycled_req_mutex);
}
static void *async_nif_worker_fn(void *);
@ -260,38 +244,41 @@ static void *async_nif_worker_fn(void *);
static int
async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_queue *q)
{
struct async_nif_worker_entry *worker;
struct async_nif_worker_entry *we;
if (0 == q)
return EINVAL;
/* Before creating a new worker thread join threads which have exited. */
for(;;) {
struct cds_lfq_node_rcu *node;
enif_mutex_lock(async_nif->we_mutex);
rcu_read_lock();
node = cds_lfq_dequeue_rcu(&async_nif->worker_join_queue);
rcu_read_unlock();
if (!node) break; /* Queue is empty. */
worker = caa_container_of(node, struct async_nif_worker_entry, queue_entry);
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(worker->tid, &exit_value);
free(worker);
uatomic_dec(&async_nif->num_active_workers);
we = SLIST_FIRST(&async_nif->we_joining);
while(we != NULL) {
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(we->tid, &exit_value);
free(we);
async_nif->we_active--;
we = n;
}
if (uatomic_read(&async_nif->num_active_workers) >= ASYNC_NIF_MAX_WORKERS)
if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) {
enif_mutex_unlock(async_nif->we_mutex);
return EAGAIN;
}
worker = malloc(sizeof(struct async_nif_worker_entry));
if (!worker) return ENOMEM;
memset(worker, 0, sizeof(struct async_nif_worker_entry));
worker->worker_id = uatomic_add_return(&async_nif->num_active_workers, 1);
worker->async_nif = async_nif;
worker->q = q;
return enif_thread_create(NULL,&worker->tid, &async_nif_worker_fn, (void*)worker, 0);
we = malloc(sizeof(struct async_nif_worker_entry));
if (!we) {
enif_mutex_unlock(async_nif->we_mutex);
return ENOMEM;
}
memset(we, 0, sizeof(struct async_nif_worker_entry));
we->worker_id = async_nif->we_active++;
we->async_nif = async_nif;
we->q = q;
enif_mutex_unlock(async_nif->we_mutex);
return enif_thread_create(NULL,&we->tid, &async_nif_worker_fn, (void*)we, 0);
}
/**
@ -308,8 +295,6 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
struct async_nif_work_queue *q = NULL;
double avg_depth = 0.0;
//rcu_register_thread();
/* Either we're choosing a queue based on some affinity/hinted value or we
need to select the next queue in the rotation and atomically update that
global value (next_q is shared across worker threads) . */
@ -327,58 +312,59 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
for (i = 0; i < async_nif->num_queues; i++) {
/* Compute the average queue depth not counting queues which are empty or
the queue we're considering right now. */
unsigned int j, d, n = 0;
unsigned int j, n = 0;
for (j = 0; j < async_nif->num_queues; j++) {
d = uatomic_read(&async_nif->queues[j].depth);
if (j != qid && d != 0) {
if (j != qid && async_nif->queues[j].depth != 0) {
n++;
avg_depth += d;
avg_depth += async_nif->queues[j].depth;
}
}
if (avg_depth) avg_depth /= n;
/* Lock this queue under consideration, then check for shutdown. While
we hold this lock either a) we're shutting down so exit now or b) this
queue will be valid until we release the lock. */
q = &async_nif->queues[qid];
enif_mutex_lock(q->reqs_mutex);
/* Try not to enqueue a request into a queue that isn't keeping up with
the request volume. */
if (uatomic_read(&q->depth) <= avg_depth) break;
else qid = (qid + 1) % async_nif->num_queues;
if (q->depth <= avg_depth) break;
else {
enif_mutex_unlock(q->reqs_mutex);
qid = (qid + 1) % async_nif->num_queues;
}
}
/* If the for loop finished then we didn't find a suitable queue for this
request (e.g. we're backed up servicing requests) or the shutdown flag was
set. Returning '0' will toss this request and free its resources.*/
if (i == async_nif->num_queues || uatomic_read(&async_nif->shutdown))
return 0;
request, meaning we're backed up so trigger eagain. Note that if we left
the loop in this way we hold no lock. */
if (i == async_nif->num_queues) return 0;
/* Add the request to the queue. */
cds_lfq_node_init_rcu(&req->queue_entry);
rcu_read_lock();
cds_lfq_enqueue_rcu(&q->req_queue, &req->queue_entry);
rcu_read_unlock();
URCU_TLS(nr_enqueues)++;
uatomic_inc(&q->depth);
uatomic_inc(&q->num_workers);
STAILQ_INSERT_TAIL(&q->reqs, req, entries);
__sync_fetch_and_add(&q->depth, 1);
/* We've selected a queue for this new request now check to make sure there are
enough workers actively processing requests on this queue. */
while (uatomic_read(&q->depth) > uatomic_read(&q->num_workers)) {
while (q->depth > q->num_workers) {
switch(async_nif_start_worker(async_nif, q)) {
case EINVAL:
case ENOMEM:
return 0;
default:
case EAGAIN:
continue;
case 0:
uatomic_inc(&q->num_workers);
goto done;
case EINVAL: case ENOMEM: default: return 0;
case EAGAIN: continue;
case 0: __sync_fetch_and_add(&q->num_workers, 1); goto done;
}
}done:;
/* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid in another thread
performing the request). */
double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE;
return enif_make_tuple2(req->env, ATOM_OK,
enif_make_tuple2(req->env, ATOM_ENQUEUED,
enif_make_double(req->env, pct_full)));
ERL_NIF_TERM reply = enif_make_tuple2(req->env, ATOM_OK,
enif_make_tuple2(req->env, ATOM_ENQUEUED,
enif_make_double(req->env, pct_full)));
enif_cond_signal(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex);
return reply;
}
/**
@ -389,46 +375,73 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
static void *
async_nif_worker_fn(void *arg)
{
struct async_nif_worker_entry *worker = (struct async_nif_worker_entry *)arg;
unsigned int worker_id = worker->worker_id;
struct async_nif_state *async_nif = worker->async_nif;
struct async_nif_work_queue *l = NULL, *q = worker->q;
struct async_nif_worker_entry *we = (struct async_nif_worker_entry *)arg;
unsigned int worker_id = we->worker_id;
struct async_nif_state *async_nif = we->async_nif;
struct async_nif_work_queue *q = we->q;
struct async_nif_req_entry *req = NULL;
unsigned int tries = async_nif->num_queues;
// TODO(gburd): set_affinity(); to the CPU_ID for this queue
rcu_register_thread();
while(q != l) {
struct cds_lfq_node_rcu *node;
struct async_nif_req_entry *req = NULL;
if (uatomic_read(&async_nif->shutdown))
for(;;) {
/* Examine the request queue, are there things to be done? */
enif_mutex_lock(q->reqs_mutex);
check_again_for_work:
if (async_nif->shutdown) {
enif_mutex_unlock(q->reqs_mutex);
break;
rcu_read_lock();
node = cds_lfq_dequeue_rcu(&q->req_queue);
rcu_read_unlock();
if (node) {
req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
uatomic_dec(&q->depth);
URCU_TLS(nr_dequeues)++;
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
req->fn_post(req->args);
async_nif_recycle_req(req, async_nif);
l = q;
}
if (STAILQ_EMPTY(&q->reqs)) {
/* Queue is empty so we wait for more work to arrive. */
enif_mutex_unlock(q->reqs_mutex);
if (tries == 0 && q == we->q) {
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
/* At this point we've tried to find/execute work on all queues
* and there are at least MIN_WORKERS on this queue so we
* leaving this loop (break) which leads to a thread exit/join. */
break;
} else {
enif_mutex_lock(q->reqs_mutex);
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work;
}
} else {
tries--;
__sync_fetch_and_add(&q->num_workers, -1);
q = q->next;
__sync_fetch_and_add(&q->num_workers, 1);
continue; // try next queue
}
} else {
/* This queue is empty, cycle through other queues looking for work. */
uatomic_dec(&q->num_workers);
q = q->next;
uatomic_inc(&q->num_workers);
/* At this point the next req is ours to process and we hold the
reqs_mutex lock. Take the request off the queue. */
req = STAILQ_FIRST(&q->reqs);
STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries);
__sync_fetch_and_add(&q->depth, -1);
/* Wake up other worker thread watching this queue to help process work. */
enif_cond_signal(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex);
/* Perform the work. */
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
/* Now call the post-work cleanup function. */
req->fn_post(req->args);
/* Clean up req for reuse. */
req->ref = 0;
req->fn_work = 0;
req->fn_post = 0;
free(req->args);
req->args = NULL;
async_nif_recycle_req(req, async_nif);
req = NULL;
}
}
uatomic_dec(&q->num_workers);
cds_lfq_node_init_rcu(&worker->queue_entry);
rcu_read_lock();
cds_lfq_enqueue_rcu(&async_nif->worker_join_queue, &worker->queue_entry);
rcu_read_unlock();
rcu_unregister_thread();
enif_mutex_lock(async_nif->we_mutex);
SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries);
enif_mutex_unlock(async_nif->we_mutex);
__sync_fetch_and_add(&q->num_workers, -1);
enif_thread_exit(0);
return 0;
}
@ -438,74 +451,83 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
{
unsigned int i;
unsigned int num_queues = async_nif->num_queues;
struct cds_lfq_node_rcu *node;
struct async_nif_work_queue *q = NULL;
struct async_nif_req_entry *req = NULL;
struct async_nif_worker_entry *we = NULL;
UNUSED(env);
/* Signal the worker threads, stop what you're doing and exit. */
uatomic_set(&async_nif->shutdown, 1);
/* Signal the worker threads, stop what you're doing and exit. To ensure
that we don't race with the enqueue() process we first lock all the worker
queues, then set shutdown to true, then unlock. The enqueue function will
take the queue mutex, then test for shutdown condition, then enqueue only
if not shutting down. */
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
enif_mutex_lock(q->reqs_mutex);
}
/* Set the shutdown flag so that worker threads will no continue
executing requests. */
async_nif->shutdown = 1;
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
enif_mutex_unlock(q->reqs_mutex);
}
/* Join for the now exiting worker threads. */
while(uatomic_read(&async_nif->num_active_workers) > 0) {
struct async_nif_worker_entry *worker;
struct cds_lfq_node_rcu *node;
rcu_read_lock();
node = cds_lfq_dequeue_rcu(&async_nif->worker_join_queue);
rcu_read_unlock();
if (node) {
worker = caa_container_of(node, struct async_nif_worker_entry, queue_entry);
while(async_nif->we_active > 0) {
for (i = 0; i < num_queues; i++)
enif_cond_broadcast(async_nif->queues[i].reqs_cnd);
enif_mutex_lock(async_nif->we_mutex);
we = SLIST_FIRST(&async_nif->we_joining);
while(we != NULL) {
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(worker->tid, &exit_value);
free(worker);
uatomic_dec(&async_nif->num_active_workers);
enif_thread_join(we->tid, &exit_value);
free(we);
async_nif->we_active--;
we = n;
}
enif_mutex_unlock(async_nif->we_mutex);
}
cds_lfq_destroy_rcu(&async_nif->worker_join_queue); // TODO(gburd): check return val
enif_mutex_destroy(async_nif->we_mutex);
/* Cleanup in-flight requests, mutexes and conditions in each work queue. */
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
/* Worker threads are stopped, now toss anything left in the queue. */
for (;;) {
rcu_read_lock();
node = cds_lfq_dequeue_rcu(&q->req_queue);
rcu_read_unlock();
if (!node) break; /* Queue is empty. */
struct async_nif_req_entry *req;
req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
enif_clear_env(req->env);
enif_send(NULL, &req->pid, req->env, enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN));
req->fn_post(req->args);
free(req->args);
enif_free_env(req->env);
free(req);
req = NULL;
req = STAILQ_FIRST(&q->reqs);
while(req != NULL) {
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
enif_clear_env(req->env);
enif_send(NULL, &req->pid, req->env,
enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN));
req->fn_post(req->args);
enif_free_env(req->env);
free(req->args);
free(req);
req = n;
}
cds_lfq_destroy_rcu(&q->req_queue); // TODO(gburd): check return val
enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd);
}
/* Free any req structures sitting unused on the recycle queue. */
for (;;) {
rcu_read_lock();
node = cds_lfq_dequeue_rcu(&async_nif->recycled_req_queue);
rcu_read_unlock();
if (!node) break; /* Queue is empty. */
struct async_nif_req_entry *req;
req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
enif_mutex_lock(async_nif->recycled_req_mutex);
req = NULL;
req = STAILQ_FIRST(&async_nif->recycled_reqs);
while(req != NULL) {
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
enif_free_env(req->env);
free(req->args);
free(req);
req = n;
}
cds_lfq_destroy_rcu(&async_nif->recycled_req_queue); // TODO(gburd): check return val
enif_mutex_unlock(async_nif->recycled_req_mutex);
enif_mutex_destroy(async_nif->recycled_req_mutex);
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
free_all_cpu_call_rcu_data();
free(async_nif);
}
@ -529,10 +551,6 @@ async_nif_load(ErlNifEnv *env)
ATOM_OK = enif_make_atom(env, "ok");
ATOM_SHUTDOWN = enif_make_atom(env, "shutdown");
/* Init the RCU library. */
rcu_init();
(void)create_all_cpu_call_rcu_data(0);
/* Find out how many schedulers there are. */
enif_system_info(&info, sizeof(ErlNifSysInfo));
@ -548,7 +566,6 @@ async_nif_load(ErlNifEnv *env)
if (num_queues < 2)
num_queues = 2;
}
num_queues = 1; //TODO remove me.
/* Init our portion of priv_data's module-specific state. */
async_nif = malloc(sizeof(struct async_nif_state) +
@ -559,15 +576,19 @@ async_nif_load(ErlNifEnv *env)
sizeof(struct async_nif_work_queue) * num_queues);
async_nif->num_queues = num_queues;
async_nif->num_active_workers = 0;
async_nif->we_active = 0;
async_nif->next_q = 0;
async_nif->shutdown = 0;
cds_lfq_init_rcu(&async_nif->recycled_req_queue, call_rcu);
cds_lfq_init_rcu(&async_nif->worker_join_queue, call_rcu);
STAILQ_INIT(&async_nif->recycled_reqs);
async_nif->recycled_req_mutex = enif_mutex_create("recycled_req");
async_nif->we_mutex = enif_mutex_create("we");
SLIST_INIT(&async_nif->we_joining);
for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i];
cds_lfq_init_rcu(&q->req_queue, call_rcu);
STAILQ_INIT(&q->reqs);
q->reqs_mutex = enif_mutex_create("reqs");
q->reqs_cnd = enif_cond_create("reqs");
q->next = &async_nif->queues[(i + 1) % num_queues];
}
return async_nif;

View file

@ -1,8 +1,5 @@
#!/bin/bash
#t=__.$$
#trap 'rm -f $t; exit 0' 0 1 2 3 13 15
# /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is.
if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then
POSIX_SHELL="true"
@ -13,22 +10,15 @@ unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as w
set -e
# WiredTiger
WT_REPO=http://github.com/wiredtiger/wiredtiger.git
#WT_BRANCH=develop
#WT_DIR=wiredtiger-`basename $WT_BRANCH`
WT_REF="tags/1.6.4"
WT_DIR=wiredtiger-`basename $WT_REF`
WT_BRANCH=develop
WT_DIR=wiredtiger-`basename $WT_BRANCH`
#WT_REF="tags/1.6.6"
#WT_DIR=wiredtiger-`basename $WT_REF`
# Google's Snappy Compression
SNAPPY_VSN="1.0.4"
SNAPPY_DIR=snappy-$SNAPPY_VSN
# User-space Read-Copy-Update (RCU)
URCU_REPO=git://git.lttng.org/userspace-rcu.git
URCU_REF="tags/v0.7.7"
URCU_DIR=urcu-`basename $URCU_REF`
[ `basename $PWD` != "c_src" ] && cd c_src
export BASEDIR="$PWD"
@ -36,41 +26,10 @@ export BASEDIR="$PWD"
which gmake 1>/dev/null 2>/dev/null && MAKE=gmake
MAKE=${MAKE:-make}
export CFLAGS="$CFLAGS -I $BASEDIR/system/include"
export CXXFLAGS="$CXXFLAGS -I $BASEDIR/system/include"
export CPPFLAGS="$CPPLAGS -I $BASEDIR/system/include -O3 -mtune=native -march=native"
export LDFLAGS="$LDFLAGS -L$BASEDIR/system/lib"
export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$BASEDIR/system/lib:$LD_LIBRARY_PATH"
get_urcu ()
{
if [ -d $BASEDIR/$URCU_DIR/.git ]; then
(cd $BASEDIR/$URCU_DIR && git pull -u) || exit 1
else
if [ "X$URCU_REF" != "X" ]; then
git clone ${URCU_REPO} ${URCU_DIR} && \
(cd $BASEDIR/$URCU_DIR && git checkout refs/$URCU_REF || exit 1)
else
git clone ${URCU_REPO} ${URCU_DIR} && \
(cd $BASEDIR/$URCU_DIR && git checkout -b $URCU_BRANCH origin/$URCU_BRANCH || exit 1)
fi
fi
[ -d $BASEDIR/$URCU_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
(cd $BASEDIR/$URCU_DIR
[ -e $BASEDIR/urcu-build.patch ] && \
(patch -p1 --forward < $BASEDIR/urcu-build.patch || exit 1 )
autoreconf -fis || exit 1
urcu_configure;
)
}
urcu_configure ()
{
(cd $BASEDIR/$URCU_DIR
LDFLAGS+="-Wl,-rpath,lib/urcu-v0.7.7/priv:lib/urcu/priv:priv" \
CFLAGS+="-m64 -Os -g -march=native -mtune=native -fPIC" \
./configure --prefix=${BASEDIR}/system || exit 1)
}
get_wt ()
{
if [ -d $BASEDIR/$WT_DIR/.git ]; then
@ -98,9 +57,8 @@ get_wt ()
wt_configure ()
{
(cd $BASEDIR/$WT_DIR/build_posix
CFLAGS+=-g ../configure --with-pic \
CFLAGS+=-g $BASEDIR/$WT_DIR/configure --with-pic \
--enable-snappy \
--disable-python --disable-java \
--prefix=${BASEDIR}/system || exit 1)
}
@ -117,9 +75,8 @@ get_snappy ()
get_deps ()
{
get_wt;
get_snappy;
get_urcu;
get_wt;
}
update_deps ()
@ -133,22 +90,6 @@ update_deps ()
fi
)
fi
if [ -d $BASEDIR/$URCU_DIR/.git ]; then
(cd $BASEDIR/$URCU_DIR
if [ "X$URCU_VSN" == "X" ]; then
git pull -u || exit 1
else
git checkout $URCU_VSN || exit 1
fi
)
fi
}
build_urcu ()
{
urcu_configure;
(cd $BASEDIR/$URCU_DIR && $MAKE -j && $MAKE install)
}
build_wt ()
@ -169,7 +110,6 @@ case "$1" in
clean)
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE clean)
[ -e $BASEDIR/$URCU_DIR/Makefile ] && (cd $BASEDIR/$URCU_DIR && $MAKE clean)
rm -rf system $SNAPPY_DIR
rm -f ${BASEDIR}/../priv/wt
rm -f ${BASEDIR}/../priv/libwiredtiger-*.so
@ -190,30 +130,23 @@ case "$1" in
;;
*)
shopt -s extglob
SUFFIXES='@(so|dylib)'
# Build Snappy
[ -d $SNAPPY_DIR ] || get_snappy;
[ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory" && exit 1)
test -f $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9] || build_snappy;
# Build URCU
[ -d $URCU_DIR ] || get_urcu;
[ -d $BASEDIR/$URCU_DIR ] || (echo "Missing URCU source directory" && exit 1)
test -f $BASEDIR/system/lib/liburcu.a || build_urcu;
test -f $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9].* || build_snappy;
# Build WiredTiger
[ -d $WT_DIR ] || get_wt;
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
test -f $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so \
-a -f $BASEDIR/system/lib/libwiredtiger_snappy.so || build_wt;
test -f $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].${SUFFIXES} -a \
-f $BASEDIR/system/lib/libwiredtiger_snappy.${SUFFIXES} || build_wt;
[ -d $BASEDIR/../priv ] || mkdir ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/bin/wt ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/libwiredtiger_snappy.so* ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/libsnappy.so* ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/liburcu.so* ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/liburcu-*.so* ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/bin/wt ${BASEDIR}/../priv
cp -p -P ${BASEDIR}/system/lib/libwiredtiger-[0-9].[0-9].[0-9].${SUFFIXES} ${BASEDIR}/../priv
cp -p -P ${BASEDIR}/system/lib/libwiredtiger_snappy.${SUFFIXES} ${BASEDIR}/../priv
cp -p -P ${BASEDIR}/system/lib/libsnappy.${SUFFIXES}* ${BASEDIR}/../priv
;;
esac
exit 0

View file

@ -42,8 +42,8 @@ extern "C" {
#define DPUTS(arg) ((void) 0)
#endif
#ifndef UNUSED
#define UNUSED(v) ((void)(v))
#ifndef __UNUSED
#define __UNUSED(v) ((void)(v))
#endif
#ifndef COMPQUIET

View file

@ -1,23 +0,0 @@
diff --git a/urcu/rculfhash.h b/urcu/rculfhash.h
index 23bd1ce..98e00b4 100644
--- a/urcu/rculfhash.h
+++ b/urcu/rculfhash.h
@@ -31,6 +31,10 @@
#include <urcu-call-rcu.h>
#include <urcu-flavor.h>
+#ifndef UNUSED
+#define UNUSED(v) ((void)(v))
+#endif
+
#ifdef __cplusplus
extern "C" {
#endif
@@ -92,6 +96,7 @@ typedef int (*cds_lfht_match_fct)(struct cds_lfht_node *node, const void *key);
static inline
void cds_lfht_node_init(struct cds_lfht_node *node)
{
+ UNUSED(node);
}
/*

View file

@ -1,106 +1,12 @@
diff --git a/ext/compressors/snappy/Makefile.am b/ext/compressors/snappy/Makefile.am
index 6d78823..2122cf8 100644
index 6d78823..c423590 100644
--- a/ext/compressors/snappy/Makefile.am
+++ b/ext/compressors/snappy/Makefile.am
@@ -2,5 +2,6 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
lib_LTLIBRARIES = libwiredtiger_snappy.la
libwiredtiger_snappy_la_SOURCES = snappy_compress.c
-libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module
+libwiredtiger_snappy_la_CFLAGS = -I$(abs_top_builddir)/../../system/include
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -L$(abs_top_builddir)/../../system/lib -Wl,-rpath,lib/wterl-0.9.0/priv:lib/wterl/priv:priv
libwiredtiger_snappy_la_LIBADD = -lsnappy
diff --git a/src/support/cksum.c b/src/support/cksum.c
index 7e9befe..b924db7 100644
--- a/src/support/cksum.c
+++ b/src/support/cksum.c
@@ -27,6 +27,13 @@
#include "wt_internal.h"
+#if defined(__amd64) || defined(__x86_64)
+#define USE_HARDWARE_CRC32 1
+#else
+#undef USE_HARDWARE_CRC32
+#endif
+
+#ifdef USE_HARDWARE_CRC32
static const uint32_t g_crc_slicing[8][256] = {
#ifdef WORDS_BIGENDIAN
/*
@@ -1078,6 +1085,7 @@ static const uint32_t g_crc_slicing[8][256] = {
}
#endif
};
+#endif /* USE_HARDWARE_CRC32 */
/*
* __wt_cksum --
@@ -1106,15 +1114,29 @@ __wt_cksum(const void *chunk, size_t len)
/* Checksum one byte at a time to the first 4B boundary. */
for (p = chunk;
((uintptr_t)p & (sizeof(uint32_t) - 1)) != 0 &&
- len > 0; ++p, --len)
+ len > 0; ++p, --len) {
+#ifdef USE_HARDWARE_CRC32
+ __asm__ __volatile__(
+ ".byte 0xF2, 0x0F, 0x38, 0xF0, 0xF1"
+ : "=S" (crc)
+ : "0" (crc), "c" (*p));
+#else
#ifdef WORDS_BIGENDIAN
crc = g_crc_slicing[0][((crc >> 24) ^ *p) & 0xFF] ^ (crc << 8);
#else
crc = g_crc_slicing[0][(crc ^ *p) & 0xFF] ^ (crc >> 8);
#endif
+#endif
+ }
/* Checksum in 8B chunks. */
for (nqwords = len / sizeof(uint64_t); nqwords; nqwords--) {
+#ifdef USE_HARDWARE_CRC32
+ __asm__ __volatile__ (
+ ".byte 0xf2, 0x48, 0x0f, 0x38, 0xf0, 0xf1;"
+ : "=S"(crc)
+ : "S"(crc), "c"(*p));
+#else
crc ^= *(uint32_t *)p;
p += sizeof(uint32_t);
next = *(uint32_t *)p;
@@ -1139,22 +1161,32 @@ __wt_cksum(const void *chunk, size_t len)
g_crc_slicing[1][(next >> 16) & 0xFF] ^
g_crc_slicing[0][(next >> 24)];
#endif
+#endif
}
/* Checksum trailing bytes one byte at a time. */
+ for (len &= 0x7; len > 0; ++p, len--) {
+#ifdef USE_HARDWARE_CRC32
+ __asm__ __volatile__(
+ ".byte 0xF2, 0x0F, 0x38, 0xF0, 0xF1"
+ : "=S" (crc)
+ : "0" (crc), "c" (*p));
+#else
#ifdef WORDS_BIGENDIAN
- for (len &= 0x7; len > 0; ++p, len--)
crc = g_crc_slicing[0][((crc >> 24) ^ *p) & 0xFF] ^ (crc << 8);
+#else
+ crc = g_crc_slicing[0][(crc ^ *p) & 0xFF] ^ (crc >> 8);
+#endif
+#endif
+ }
+#ifdef WORDS_BIGENDIAN
/* Do final byte swap to produce a result identical to little endian */
crc =
((crc << 24) & 0xFF000000) |
((crc << 8) & 0x00FF0000) |
((crc >> 8) & 0x0000FF00) |
((crc >> 24) & 0x000000FF);
-#else
- for (len &= 0x7; len > 0; ++p, len--)
- crc = g_crc_slicing[0][(crc ^ *p) & 0xFF] ^ (crc >> 8);
#endif
return (~crc);
}

View file

@ -440,6 +440,7 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
return;
}
/**
* Callback to handle error messages.
*
@ -454,13 +455,15 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
* operation or library failure.
*/
int
__wterl_error_handler(WT_EVENT_HANDLER *handler, int error, const char *message)
__wterl_error_handler(WT_EVENT_HANDLER *handler, WT_SESSION *session,
int error, const char *message)
{
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
ErlNifEnv *msg_env;
ErlNifPid *to_pid;
int rc = 0;
UNUSED(session);
enif_mutex_lock(eh->error_mutex);
msg_env = eh->msg_env_error;
to_pid = &eh->to_pid;
@ -492,13 +495,14 @@ __wterl_error_handler(WT_EVENT_HANDLER *handler, int error, const char *message)
* operation or library failure.
*/
int
__wterl_message_handler(WT_EVENT_HANDLER *handler, const char *message)
__wterl_message_handler(WT_EVENT_HANDLER *handler, WT_SESSION *session, const char *message)
{
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
ErlNifEnv *msg_env;
ErlNifPid *to_pid;
int rc = 0;
UNUSED(session);
enif_mutex_lock(eh->message_mutex);
msg_env = eh->msg_env_message;
to_pid = &eh->to_pid;
@ -529,13 +533,14 @@ __wterl_message_handler(WT_EVENT_HANDLER *handler, const char *message)
* operation or library failure.
*/
int
__wterl_progress_handler(WT_EVENT_HANDLER *handler, const char *operation, uint64_t counter)
__wterl_progress_handler(WT_EVENT_HANDLER *handler, WT_SESSION *session, const char *operation, uint64_t counter)
{
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
ErlNifEnv *msg_env;
ErlNifPid *to_pid;
int rc = 0;
UNUSED(session);
enif_mutex_lock(eh->progress_mutex);
msg_env = eh->msg_env_progress;
to_pid = &eh->to_pid;
@ -2303,7 +2308,7 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
char msg[1024];
snprintf(msg, 1024, "NIF on_load complete (wterl version: %s, wiredtiger version: %s)", priv->wterl_vsn, priv->wiredtiger_vsn);
__wterl_message_handler((WT_EVENT_HANDLER *)&priv->eh, msg);
__wterl_message_handler((WT_EVENT_HANDLER *)&priv->eh, NULL, msg);
return 0;
}

6
priv/wterl.schema Normal file
View file

@ -0,0 +1,6 @@
%%%% This is the WiredTiger section
%% @doc wiredtiger data_root
{mapping, "wiredtiger.data_root", "wterl.data_root", [
{default, "{{platform_data_dir}}/wiredtiger"}
]}.

View file

@ -38,8 +38,8 @@
{port_specs, [{"priv/wterl.so", ["c_src/*.c"]}]}.
{port_env, [
{"DRV_CFLAGS", "$DRV_CFLAGS -fPIC -Wall -Wextra -Werror -fdebug-cpp -ftrack-macro-expansion=2 -I c_src/system/include"},
{"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:priv -Lc_src/system/lib -lurcu -lurcu-cds -lwiredtiger"}
{"DRV_CFLAGS", "$DRV_CFLAGS -O3 -mtune=native -march=native -fPIC -Wall -Wextra -Werror -I c_src/system/include"},
{"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:lib/wterl-0.9.0/priv:priv -Lc_src/system/lib -lwiredtiger"}
]}.
{pre_hooks, [{compile, "c_src/build_deps.sh compile"}]}.

View file

@ -22,6 +22,7 @@
-module(riak_kv_wterl_backend).
-behavior(temp_riak_kv_backend).
-compile([{parse_transform, lager_transform}]).
%% KV Backend API
-export([api_version/0,
@ -42,7 +43,7 @@
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compiel(export_all).
-compile(export_all).
-endif.
-define(API_VERSION, 1).
@ -119,14 +120,14 @@ start(Partition, Config) ->
"lsm" ->
[{internal_page_max, "128K"},
{leaf_page_max, "16K"},
{lsm_chunk_size, "100MB"},
{lsm_merge_threads, 2},
{prefix_compression, true},
{lsm_bloom_newest, true},
{lsm_bloom_oldest, true} ,
{lsm_bloom_bit_count, 28},
{lsm_bloom_hash_count, 19},
{lsm_bloom_config, [{leaf_page_max, "8MB"}]}
{lsm, [
{bloom_config, [{leaf_page_max, "8MB"}]},
{bloom_bit_count, 28},
{bloom_hash_count, 19},
{bloom_oldest, true},
{chunk_size, "100MB"},
{merge_threads, 2}
]}
] ++ Compressor;
"table" ->
Compressor
@ -341,22 +342,23 @@ is_empty(#state{connection=Connection, table=Table}) ->
%% @doc Get the status information for this wterl backend
-spec status(state()) -> [{atom(), term()}].
status(#state{connection=Connection, table=Table}) ->
case wterl:cursor_open(Connection, Table) of
{ok, Cursor} ->
TheStats =
case fetch_status(Cursor) of
{ok, Stats} ->
Stats;
{error, {eperm, _}} -> % TODO: review/fix this logic
{ok, []};
_ ->
{ok, []}
end,
wterl:cursor_close(Cursor),
TheStats;
{error, Reason2} ->
{error, Reason2}
end.
[].
%% case wterl:cursor_open(Connection, "statistics:" ++ Table, [{statistics_fast, true}]) of
%% {ok, Cursor} ->
%% TheStats =
%% case fetch_status(Cursor) of
%% {ok, Stats} ->
%% Stats;
%% {error, {eperm, _}} -> % TODO: review/fix this logic
%% {ok, []};
%% _ ->
%% {ok, []}
%% end,
%% wterl:cursor_close(Cursor),
%% TheStats;
%% {error, Reason2} ->
%% {error, Reason2}
%% end.
%% @doc Register an asynchronous callback
-spec callback(reference(), any(), state()) -> {ok, state()}.
@ -399,30 +401,41 @@ establish_connection(Config, Type) ->
ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
%% WT Connection Options:
%% NOTE: LSM auto-checkpoints, so we don't have too.
LogSetting = app_helper:get_prop_or_env(log, Config, wterl, false),
CheckpointSetting =
case Type =:= "lsm" of
true ->
[];
case LogSetting of
true ->
%% Turn checkpoints on if logging is on, checkpoints enable log archival.
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 30}]); % in seconds
_ ->
[]
end;
false ->
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 10}])
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 30}])
end,
RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl),
ConnectionOpts =
orddict:from_list(
[ wterl:config_value(create, Config, true),
wterl:config_value(sync, Config, false),
wterl:config_value(checkpoint_sync, Config, false),
wterl:config_value(transaction_sync, Config, "none"),
wterl:config_value(log, Config, [{enabled, LogSetting}]),
wterl:config_value(mmap, Config, false),
wterl:config_value(checkpoint, Config, CheckpointSetting),
wterl:config_value(session_max, Config, max_sessions(Config)),
wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)),
wterl:config_value(statistics, Config, [ "fast", "clear"]),
wterl:config_value(statistics_log, Config, [{wait, 600}]), % in seconds
wterl:config_value(verbose, Config, [ "salvage", "verify"
% Note: for some unknown reason, if you add these additional
% verbose flags Erlang SEGV's "size_object: bad tag for 0x80"
% no idea why... yet... you've been warned.
% no idea why... you've been warned.
%"block", "shared_cache", "reconcile", "evict", "lsm",
%"fileops", "read", "write", "readserver", "evictserver",
%"hazard", "mutex", "ckpt"
]) ] ++ CheckpointSetting ++ proplists:get_value(wterl, Config, [])), % sec
]) ] ++ proplists:get_value(wterl, Config, [])), % sec
%% WT Session Options:
SessionOpts = [{isolation, "snapshot"}],
@ -543,15 +556,15 @@ from_index_key(LKey) ->
%% @private
%% Return all status from wterl statistics cursor
fetch_status(Cursor) ->
{ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}.
fetch_status(_Cursor, {error, _}, Acc) ->
lists:reverse(Acc);
fetch_status(_Cursor, not_found, Acc) ->
lists:reverse(Acc);
fetch_status(Cursor, {ok, Stat}, Acc) ->
[What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])],
fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]).
%% fetch_status(Cursor) ->
%% {ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}.
%% fetch_status(_Cursor, {error, _}, Acc) ->
%% lists:reverse(Acc);
%% fetch_status(_Cursor, not_found, Acc) ->
%% lists:reverse(Acc);
%% fetch_status(Cursor, {ok, Stat}, Acc) ->
%% [What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])],
%% fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]).
size_cache(RequestedSize) ->
Size =

View file

@ -96,8 +96,8 @@ nif_stub_error(Line) ->
-spec init() -> ok | {error, any()}.
init() ->
erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]),
[{wterl_vsn, "53307e8"},
{wiredtiger_vsn, "1.6.2-0-g07cb0a5"}]).
[{wterl_vsn, "942e51b"},
{wiredtiger_vsn, "1.6.4-275-g9c44420"}]). %% TODO automate these
-spec connection_open(string(), config_list()) -> {ok, connection()} | {error, term()}.
-spec connection_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}.
@ -454,17 +454,26 @@ config_to_bin([], Acc) ->
config_to_bin([{Key, Value} | Rest], Acc) ->
ConfigTypes =
[{block_compressor, {string, quoted}},
{bloom_bit_count, integer},
{bloom_config, config},
{bloom_hash_count, integer},
{bloom_newest, bool},
{bloom_oldest, bool},
{cache_size, string},
{checkpoint, config},
{checkpoint_sync, bool},
{checksum, string},
{chunk_size, string},
{create, bool},
{direct_io, list},
{drop, list},
{enabled, bool},
{error_prefix, string},
{eviction_target, integer},
{eviction_trigger, integer},
{extensions, {list, quoted}},
{statistics_fast, bool},
{file_max, string},
{force, bool},
{from, string},
{hazard_max, integer},
@ -474,24 +483,21 @@ config_to_bin([{Key, Value} | Rest], Acc) ->
{isolation, string},
{key_type, string},
{leaf_page_max, string},
{logging, bool},
{lsm_bloom_bit_count, integer},
{lsm_bloom_config, config},
{lsm_bloom_hash_count, integer},
{lsm_bloom_newest, bool},
{lsm_bloom_oldest, bool},
{lsm_chunk_size, string},
{prefix_compression, bool},
{lsm_merge_threads, integer},
{log, config},
{lsm, config},
{mmap, bool},
{merge_threads, integer},
{multiprocess, bool},
{name, string},
{overwrite, bool},
{prefix_compression, bool},
{raw, bool},
{session_max, integer},
{statistics, list},
{statistics_log, config},
{sync, bool},
{target, {list, quoted}},
{to, string},
{transaction_sync, string},
{transactional, bool},
{verbose, list},
{wait, integer}],

View file

@ -25,7 +25,7 @@
{mode, max}.
{duration, 10}.
{concurrent, 4}.
{concurrent, 16}.
{report_interval, 1}.
{pb_timeout_general, 1000}. % ms
%{pb_timeout_read, ?}.
@ -43,7 +43,9 @@
{wterl, [
{connection, [
{create, true},
{sync, false},
{session_sync, false},
{transaction_sync, "none"},
{log, [{enabled, false}]},
{session_max, 1024},
{cache_size, 4294967296},
{verbose, []},
@ -58,11 +60,11 @@
]},
{session, [ {isolation, "snapshot"} ]},
{table_uri, "lsm:test"},
{lsm_merge_threads, 2},
{table, [
{internal_page_max, "128K"},
{leaf_page_max, "128K"},
{lsm_chunk_size, "25MB"},
{prefix_compression, false},
{lsm_bloom_newest, true},
{lsm_bloom_oldest, true} ,
{lsm_bloom_bit_count, 128},
@ -76,7 +78,9 @@
{wterl_, [
{connection, [
{create, true},
{sync, false},
{session_sync, false},
{transaction_sync, "none"},
{log, [{enabled, false}]},
{session_max, 1024},
{cache_size, 4294967296},
{verbose, []},
@ -93,7 +97,6 @@
{session, [ {isolation, "snapshot"} ]},
{table_uri, "table:test"},
{table, [
{prefix_compression, false},
{block_compressor, "snappy"} % bzip2
]}
]}.