Compare commits

..

No commits in common. "master" and "gsb-shared-cursors" have entirely different histories.

27 changed files with 1198 additions and 5390 deletions

View file

@ -1,4 +0,0 @@
handle SIGPIPE nostop noprint pass
#b erl_nif.c:1203
#b sys/unix/erl_unix_sys_ddll.c:234

8
.gitignore vendored
View file

@ -1,14 +1,8 @@
*.beam *.beam
.eunit .eunit
ebin ebin
priv/*.so
c_src/system c_src/system
c_src/wiredtiger*/ c_src/wiredtiger*/
c_src/*.o c_src/*.o
c_src/bzip2-1.0.6
c_src/snappy-1.0.4
deps/
priv/wt
priv/*.so*
priv/*.dylib*
log/
*~ *~

View file

@ -1,2 +0,0 @@
-author('Steve Vinoski <steve@basho.com>').
-author('Gregory Burd <steve@basho.com>'). % greg@burd.me @gregburd

143
Makefile
View file

@ -1,72 +1,24 @@
# Copyright 2012 Erlware, LLC. All Rights Reserved. TARGET= wterl
#
# This file is provided to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file
# except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Source: https://gist.github.com/ericbmerritt/5706091 REBAR= ./rebar
#REBAR= /usr/bin/env rebar
ERL= /usr/bin/env erl
DIALYZER= /usr/bin/env dialyzer
ERLFLAGS= -pa $(CURDIR)/.eunit -pa $(CURDIR)/ebin -pa $(CURDIR)/deps/*/ebin .PHONY: plt analyze all deps compile get-deps clean
DEPS_PLT=$(CURDIR)/.deps_plt all: compile
DEPS=erts kernel stdlib
# ============================================================================= deps: get-deps
# Verify that the programs we need to run are installed on this system
# =============================================================================
ERL = $(shell which erl)
ifeq ($(ERL),) get-deps:
$(error "Erlang not available on this system") @$(REBAR) get-deps
endif
REBAR=$(shell which rebar)
ifeq ($(REBAR),)
$(error "Rebar not available on this system")
endif
DIALYZER=$(shell which dialyzer)
ifeq ($(DIALYZER),)
$(error "Dialyzer not available on this system")
endif
TYPER=$(shell which typer)
ifeq ($(TYPER),)
$(error "Typer not available on this system")
endif
.PHONY: all compile doc clean test dialyzer typer shell distclean pdf \
update-deps clean-common-test-data rebuild
all: deps compile
# =============================================================================
# Rules to build the system
# =============================================================================
deps:
$(REBAR) get-deps
$(REBAR) compile
update-deps: update-deps:
$(REBAR) update-deps @$(REBAR) update-deps
$(REBAR) compile
c_src/wterl.o: c_src/async_nif.h c_src/wterl.o:
touch c_src/wterl.c touch c_src/wterl.c
ebin/app_helper.beam: ebin/app_helper.beam:
@ -75,52 +27,33 @@ ebin/app_helper.beam:
@/bin/false @/bin/false
compile: c_src/wterl.o ebin/app_helper.beam compile: c_src/wterl.o ebin/app_helper.beam
$(REBAR) skip_deps=true compile @$(REBAR) compile
doc:
$(REBAR) skip_deps=true doc
eunit: compile clean-common-test-data
$(REBAR) skip_deps=true eunit
test: compile eunit
$(DEPS_PLT):
@echo Building local plt at $(DEPS_PLT)
@echo
dialyzer --output_plt $(DEPS_PLT) --build_plt \
--apps $(DEPS) -r deps
dialyzer: $(DEPS_PLT)
$(DIALYZER) --fullpath --plt $(DEPS_PLT) -Wrace_conditions -r ./ebin
typer:
$(TYPER) --plt $(DEPS_PLT) -r ./src
xref:
$(REBAR) xref skip_deps=true
# You often want *rebuilt* rebar tests to be available to the shell you have to
# call eunit (to get the tests rebuilt). However, eunit runs the tests, which
# probably fails (thats probably why You want them in the shell). This
# (prefixing the command with "-") runs eunit but tells make to ignore the
# result.
shell: deps compile
- @$(REBAR) skip_deps=true eunit
@$(ERL) $(ERLFLAGS)
pdf:
pandoc README.md -o README.pdf
clean: clean:
- c_src/build_deps.sh clean @$(REBAR) clean
- rm -rf $(CURDIR)/test/*.beam
- rm -rf $(CURDIR)/logs
- rm -rf $(CURDIR)/ebin
$(REBAR) skip_deps=true clean
distclean: clean test: eunit
- rm -rf $(DEPS_PLT)
- rm -rvf $(CURDIR)/deps
rebuild: distclean deps compile escript dialyzer test eunit: compile
@$(REBAR) eunit skip_deps=true
eunit_console:
@$(ERL) -pa .eunit deps/*/ebin
plt: compile
@$(DIALYZER) --build_plt --output_plt .$(TARGET).plt -pa deps/*/ebin --apps kernel stdlib
analyze: compile
$(DIALYZER) --plt .$(TARGET).plt -pa deps/*/ebin ebin
repl:
$(ERL) -pz deps/*/ebin -pa ebin
gdb-repl:
USE_GDB=1 $(ERL) -pz deps/*/ebin -pa ebin
eunit-repl:
$(ERL) -pa .eunit -pz deps/*/ebin -pz ebin -exec 'cd(".eunit").'
gdb-eunit-repl:
USE_GDB=1 $(ERL) -pa .eunit -pz deps/*/ebin -pz ebin -exec 'cd(".eunit").'

View file

@ -1,54 +1,26 @@
`wterl` is an Erlang interface to the WiredTiger database, and is written to `wterl` is an Erlang interface to the WiredTiger database, and is written
support a Riak storage backend that uses WiredTiger. to support a Riak storage backend that uses WiredTiger.
This backend currently supports only key-value storage and retrieval.
Remaining work includes: Remaining work includes:
TODO: * The `wterl:session_create` function currently returns an error under
* Find/fix any code marked "TODO:" certain circumstances, so we currently ignore its return value.
* Why do we see {error, {eperm, _}} result on wterl:cursor_close/1 during * The `riak_kv_wterl_backend` module is currently designed to rely on the
fold_objects/4? fact that it runs in just a single Erlang scheduler thread, which is
* Why do we see {error, {eperm, _}} result on wterl:cursor_close/1? necessary because WiredTiger doesn't allow a session to be used
* Why do we see {error, {eperm, _}} result on wterl:cursor_next/1 during concurrently by different threads. If the KV node design ever changes to
is_empty/1? involve concurrency across scheduler threads, this current design will no
* Why do we see {error, {eperm, _}} result on wterl:cursor_next_value/1 longer work correctly.
during status/1?
* Why do we see {error, {ebusy, _}} result on wterl:drop/2?
* Determine a better way to estimate the number of sessions we should
configure WT for at startup in riak_kv_wterl_backend:max_sessions/1.
* Make sure Erlang is optimizing for selective receive in async_nif_enqueue/3
because in the eLevelDB driver there is a comment: "This cannot be a separate
function. Code must be inline to trigger Erlang compiler's use of optimized
selective receive."
* Provide a way to configure the cursor options, right now they are
always "raw,overwrite".
* Add support for Riak/KV 2i indexes using the same design pattern
as eLevelDB (in a future version consider alternate schema)
* If an operation using a shared cursor results in a non-normal error
then it should be closed/discarded from the recycled pool
* Cache cursors based on hash(table/config) rather than just table.
* Finish NIF unload/reload functions and test.
* Test an upgrade, include a format/schema/WT change.
* When WT_PANIC is returned first try to unload/reload then driver
and reset all state, if that fails then exit gracefully.
* Currently the `riak_kv_wterl_backend` module is stored in this * Currently the `riak_kv_wterl_backend` module is stored in this
repository, but it really belongs in the `riak_kv` repository. repository, but it really belongs in the `riak_kv` repository.
* wterl:truncate/5 can segv, and its tests are commented out * There are currently some stability issues with WiredTiger that can
* Add async_nif and wterl NIF stats to the results provided by the sometimes cause errors when restarting KV nodes with non-empty WiredTiger
stats API storage.
* Longer term ideas/changes to consider:
* More testing, especially pulse/qc
* Riak/KV integration
* Store 2i indexes in separate tables
* Store buckets, in separate tables and keep a <<bucket/key>> index
to ensure that folds across a vnode are easy
* Provide a drop bucket API call
* Support key expirey
* An ets API (like the LevelDB's lets project)
* Use mime-type to inform WT's schema for key value encoding
* Other use cases within Riak
* An AAE driver using WT
* An ability to store the ring file via WT
Future support for secondary indexes requires WiredTiger features that are
under development but are not yet available.
Deploying Deploying
--------- ---------

View file

@ -1,609 +0,0 @@
/*
* async_nif: An async thread-pool layer for Erlang's NIF API
*
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
* Author: Gregory Burd <greg@basho.com> <greg@burd.me>
*
* This file is provided to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
#ifndef __ASYNC_NIF_H__
#define __ASYNC_NIF_H__
#if defined(__cplusplus)
extern "C" {
#endif
#include <assert.h>
#include "queue.h"
#ifndef UNUSED
#define UNUSED(v) ((void)(v))
#endif
#define ASYNC_NIF_MAX_WORKERS 1024
#define ASYNC_NIF_MIN_WORKERS 2
#define ASYNC_NIF_WORKER_QUEUE_SIZE 8192
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
/* Atoms (initialized in on_load) */
static ERL_NIF_TERM ATOM_EAGAIN;
static ERL_NIF_TERM ATOM_ENOMEM;
static ERL_NIF_TERM ATOM_ENQUEUED;
static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_SHUTDOWN;
struct async_nif_req_entry {
ERL_NIF_TERM ref;
ErlNifEnv *env;
ErlNifPid pid;
void *args;
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
void (*fn_post)(void *);
STAILQ_ENTRY(async_nif_req_entry) entries;
};
struct async_nif_work_queue {
unsigned int num_workers;
unsigned int depth;
ErlNifMutex *reqs_mutex;
ErlNifCond *reqs_cnd;
struct async_nif_work_queue *next;
STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
};
struct async_nif_worker_entry {
ErlNifTid tid;
unsigned int worker_id;
struct async_nif_state *async_nif;
struct async_nif_work_queue *q;
SLIST_ENTRY(async_nif_worker_entry) entries;
};
struct async_nif_state {
unsigned int shutdown;
ErlNifMutex *we_mutex;
unsigned int we_active;
SLIST_HEAD(joining, async_nif_worker_entry) we_joining;
unsigned int num_queues;
unsigned int next_q;
STAILQ_HEAD(recycled_reqs, async_nif_req_entry) recycled_reqs;
unsigned int num_reqs;
ErlNifMutex *recycled_req_mutex;
struct async_nif_work_queue queues[];
};
#define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \
struct decl ## _args frame; \
static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) { \
UNUSED(worker_id); \
DPRINTF("async_nif: calling \"%s\"", __func__); \
do work_block while(0); \
DPRINTF("async_nif: returned from \"%s\"", __func__); \
} \
static void fn_post_ ## decl (struct decl ## _args *args) { \
UNUSED(args); \
DPRINTF("async_nif: calling \"fn_post_%s\"", #decl); \
do post_block while(0); \
DPRINTF("async_nif: returned from \"fn_post_%s\"", #decl); \
} \
static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \
struct decl ## _args on_stack_args; \
struct decl ## _args *args = &on_stack_args; \
struct decl ## _args *copy_of_args; \
struct async_nif_req_entry *req = NULL; \
unsigned int affinity = 0; \
ErlNifEnv *new_env = NULL; \
/* argv[0] is a ref used for selective recv */ \
const ERL_NIF_TERM *argv = argv_in + 1; \
argc -= 1; \
/* Note: !!! this assumes that the first element of priv_data is ours */ \
struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \
if (async_nif->shutdown) \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_SHUTDOWN); \
req = async_nif_reuse_req(async_nif); \
if (!req) \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \
new_env = req->env; \
DPRINTF("async_nif: calling \"%s\"", __func__); \
do pre_block while(0); \
DPRINTF("async_nif: returned from \"%s\"", __func__); \
copy_of_args = (struct decl ## _args *)malloc(sizeof(struct decl ## _args)); \
if (!copy_of_args) { \
fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \
} \
memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \
req->ref = enif_make_copy(new_env, argv_in[0]); \
enif_self(env, &req->pid); \
req->args = (void*)copy_of_args; \
req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \
req->fn_post = (void (*)(void *))fn_post_ ## decl; \
int h = -1; \
if (affinity) \
h = ((unsigned int)affinity) % async_nif->num_queues; \
ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \
if (!reply) { \
fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \
free(copy_of_args); \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_EAGAIN); \
} \
return reply; \
}
#define ASYNC_NIF_INIT(name) \
static ErlNifMutex *name##_async_nif_coord = NULL;
#define ASYNC_NIF_LOAD(name, env, priv) do { \
if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create("nif_coord load"); \
enif_mutex_lock(name##_async_nif_coord); \
priv = async_nif_load(env); \
enif_mutex_unlock(name##_async_nif_coord); \
} while(0);
#define ASYNC_NIF_UNLOAD(name, env, priv) do { \
if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create("nif_coord unload"); \
enif_mutex_lock(name##_async_nif_coord); \
async_nif_unload(env, priv); \
enif_mutex_unlock(name##_async_nif_coord); \
enif_mutex_destroy(name##_async_nif_coord); \
name##_async_nif_coord = NULL; \
} while(0);
#define ASYNC_NIF_UPGRADE(name, env) do { \
if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create("nif_coord upgrade"); \
enif_mutex_lock(name##_async_nif_coord); \
async_nif_upgrade(env); \
enif_mutex_unlock(name##_async_nif_coord); \
} while(0);
#define ASYNC_NIF_RETURN_BADARG() do { \
async_nif_recycle_req(req, async_nif); \
return enif_make_badarg(env); \
} while(0);
#define ASYNC_NIF_WORK_ENV new_env
#define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg))
/**
* Return a request structure from the recycled req queue if one exists,
* otherwise create one.
*/
struct async_nif_req_entry *
async_nif_reuse_req(struct async_nif_state *async_nif)
{
struct async_nif_req_entry *req = NULL;
ErlNifEnv *env = NULL;
enif_mutex_lock(async_nif->recycled_req_mutex);
if (STAILQ_EMPTY(&async_nif->recycled_reqs)) {
if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) {
req = malloc(sizeof(struct async_nif_req_entry));
if (req) {
memset(req, 0, sizeof(struct async_nif_req_entry));
env = enif_alloc_env();
if (env) {
req->env = env;
__sync_fetch_and_add(&async_nif->num_reqs, 1);
} else {
free(req);
req = NULL;
}
}
}
} else {
req = STAILQ_FIRST(&async_nif->recycled_reqs);
STAILQ_REMOVE(&async_nif->recycled_reqs, req, async_nif_req_entry, entries);
}
enif_mutex_unlock(async_nif->recycled_req_mutex);
return req;
}
/**
* Store the request for future re-use.
*
* req a request entry with an ErlNifEnv* which will be cleared
* before reuse, but not until then.
* async_nif a handle to our state so that we can find and use the mutex
*/
void
async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif)
{
ErlNifEnv *env = NULL;
enif_mutex_lock(async_nif->recycled_req_mutex);
enif_clear_env(req->env);
env = req->env;
memset(req, 0, sizeof(struct async_nif_req_entry));
req->env = env;
STAILQ_INSERT_TAIL(&async_nif->recycled_reqs, req, entries);
enif_mutex_unlock(async_nif->recycled_req_mutex);
}
static void *async_nif_worker_fn(void *);
/**
* Start up a worker thread.
*/
static int
async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_queue *q)
{
struct async_nif_worker_entry *we;
if (0 == q)
return EINVAL;
enif_mutex_lock(async_nif->we_mutex);
we = SLIST_FIRST(&async_nif->we_joining);
while(we != NULL) {
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(we->tid, &exit_value);
free(we);
async_nif->we_active--;
we = n;
}
if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) {
enif_mutex_unlock(async_nif->we_mutex);
return EAGAIN;
}
we = malloc(sizeof(struct async_nif_worker_entry));
if (!we) {
enif_mutex_unlock(async_nif->we_mutex);
return ENOMEM;
}
memset(we, 0, sizeof(struct async_nif_worker_entry));
we->worker_id = async_nif->we_active++;
we->async_nif = async_nif;
we->q = q;
enif_mutex_unlock(async_nif->we_mutex);
return enif_thread_create(NULL,&we->tid, &async_nif_worker_fn, (void*)we, 0);
}
/**
* Enqueue a request for processing by a worker thread.
*
* Places the request into a work queue determined either by the
* provided affinity or by iterating through the available queues.
*/
static ERL_NIF_TERM
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint)
{
/* Identify the most appropriate worker for this request. */
unsigned int i, last_qid, qid = 0;
struct async_nif_work_queue *q = NULL;
double avg_depth = 0.0;
/* Either we're choosing a queue based on some affinity/hinted value or we
need to select the next queue in the rotation and atomically update that
global value (next_q is shared across worker threads) . */
if (hint >= 0) {
qid = (unsigned int)hint;
} else {
do {
last_qid = __sync_fetch_and_add(&async_nif->next_q, 0);
qid = (last_qid + 1) % async_nif->num_queues;
} while (!__sync_bool_compare_and_swap(&async_nif->next_q, last_qid, qid));
}
/* Now we inspect and interate across the set of queues trying to select one
that isn't too full or too slow. */
for (i = 0; i < async_nif->num_queues; i++) {
/* Compute the average queue depth not counting queues which are empty or
the queue we're considering right now. */
unsigned int j, n = 0;
for (j = 0; j < async_nif->num_queues; j++) {
if (j != qid && async_nif->queues[j].depth != 0) {
n++;
avg_depth += async_nif->queues[j].depth;
}
}
if (avg_depth) avg_depth /= n;
/* Lock this queue under consideration, then check for shutdown. While
we hold this lock either a) we're shutting down so exit now or b) this
queue will be valid until we release the lock. */
q = &async_nif->queues[qid];
enif_mutex_lock(q->reqs_mutex);
/* Try not to enqueue a request into a queue that isn't keeping up with
the request volume. */
if (q->depth <= avg_depth) break;
else {
enif_mutex_unlock(q->reqs_mutex);
qid = (qid + 1) % async_nif->num_queues;
}
}
/* If the for loop finished then we didn't find a suitable queue for this
request, meaning we're backed up so trigger eagain. Note that if we left
the loop in this way we hold no lock. */
if (i == async_nif->num_queues) return 0;
/* Add the request to the queue. */
STAILQ_INSERT_TAIL(&q->reqs, req, entries);
__sync_fetch_and_add(&q->depth, 1);
/* We've selected a queue for this new request now check to make sure there are
enough workers actively processing requests on this queue. */
while (q->depth > q->num_workers) {
switch(async_nif_start_worker(async_nif, q)) {
case EINVAL: case ENOMEM: default: return 0;
case EAGAIN: continue;
case 0: __sync_fetch_and_add(&q->num_workers, 1); goto done;
}
}done:;
/* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid in another thread
performing the request). */
double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE;
ERL_NIF_TERM reply = enif_make_tuple2(req->env, ATOM_OK,
enif_make_tuple2(req->env, ATOM_ENQUEUED,
enif_make_double(req->env, pct_full)));
enif_cond_signal(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex);
return reply;
}
/**
* Worker threads execute this function. Here each worker pulls requests of
* their respective queues, executes that work and continues doing that until
* they see the shutdown flag is set at which point they exit.
*/
static void *
async_nif_worker_fn(void *arg)
{
struct async_nif_worker_entry *we = (struct async_nif_worker_entry *)arg;
unsigned int worker_id = we->worker_id;
struct async_nif_state *async_nif = we->async_nif;
struct async_nif_work_queue *q = we->q;
struct async_nif_req_entry *req = NULL;
unsigned int tries = async_nif->num_queues;
for(;;) {
/* Examine the request queue, are there things to be done? */
enif_mutex_lock(q->reqs_mutex);
check_again_for_work:
if (async_nif->shutdown) {
enif_mutex_unlock(q->reqs_mutex);
break;
}
if (STAILQ_EMPTY(&q->reqs)) {
/* Queue is empty so we wait for more work to arrive. */
enif_mutex_unlock(q->reqs_mutex);
if (tries == 0 && q == we->q) {
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
/* At this point we've tried to find/execute work on all queues
* and there are at least MIN_WORKERS on this queue so we
* leaving this loop (break) which leads to a thread exit/join. */
break;
} else {
enif_mutex_lock(q->reqs_mutex);
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work;
}
} else {
tries--;
__sync_fetch_and_add(&q->num_workers, -1);
q = q->next;
__sync_fetch_and_add(&q->num_workers, 1);
continue; // try next queue
}
} else {
/* At this point the next req is ours to process and we hold the
reqs_mutex lock. Take the request off the queue. */
req = STAILQ_FIRST(&q->reqs);
STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries);
__sync_fetch_and_add(&q->depth, -1);
/* Wake up other worker thread watching this queue to help process work. */
enif_cond_signal(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex);
/* Perform the work. */
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
/* Now call the post-work cleanup function. */
req->fn_post(req->args);
/* Clean up req for reuse. */
req->ref = 0;
req->fn_work = 0;
req->fn_post = 0;
free(req->args);
req->args = NULL;
async_nif_recycle_req(req, async_nif);
req = NULL;
}
}
enif_mutex_lock(async_nif->we_mutex);
SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries);
enif_mutex_unlock(async_nif->we_mutex);
__sync_fetch_and_add(&q->num_workers, -1);
enif_thread_exit(0);
return 0;
}
static void
async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
{
unsigned int i;
unsigned int num_queues = async_nif->num_queues;
struct async_nif_work_queue *q = NULL;
struct async_nif_req_entry *req = NULL;
struct async_nif_worker_entry *we = NULL;
UNUSED(env);
/* Signal the worker threads, stop what you're doing and exit. To ensure
that we don't race with the enqueue() process we first lock all the worker
queues, then set shutdown to true, then unlock. The enqueue function will
take the queue mutex, then test for shutdown condition, then enqueue only
if not shutting down. */
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
enif_mutex_lock(q->reqs_mutex);
}
/* Set the shutdown flag so that worker threads will no continue
executing requests. */
async_nif->shutdown = 1;
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
enif_mutex_unlock(q->reqs_mutex);
}
/* Join for the now exiting worker threads. */
while(async_nif->we_active > 0) {
for (i = 0; i < num_queues; i++)
enif_cond_broadcast(async_nif->queues[i].reqs_cnd);
enif_mutex_lock(async_nif->we_mutex);
we = SLIST_FIRST(&async_nif->we_joining);
while(we != NULL) {
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(we->tid, &exit_value);
free(we);
async_nif->we_active--;
we = n;
}
enif_mutex_unlock(async_nif->we_mutex);
}
enif_mutex_destroy(async_nif->we_mutex);
/* Cleanup in-flight requests, mutexes and conditions in each work queue. */
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
/* Worker threads are stopped, now toss anything left in the queue. */
req = NULL;
req = STAILQ_FIRST(&q->reqs);
while(req != NULL) {
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
enif_clear_env(req->env);
enif_send(NULL, &req->pid, req->env,
enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN));
req->fn_post(req->args);
enif_free_env(req->env);
free(req->args);
free(req);
req = n;
}
enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd);
}
/* Free any req structures sitting unused on the recycle queue. */
enif_mutex_lock(async_nif->recycled_req_mutex);
req = NULL;
req = STAILQ_FIRST(&async_nif->recycled_reqs);
while(req != NULL) {
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
enif_free_env(req->env);
free(req);
req = n;
}
enif_mutex_unlock(async_nif->recycled_req_mutex);
enif_mutex_destroy(async_nif->recycled_req_mutex);
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
free(async_nif);
}
static void *
async_nif_load(ErlNifEnv *env)
{
static int has_init = 0;
unsigned int i, num_queues;
ErlNifSysInfo info;
struct async_nif_state *async_nif;
/* Don't init more than once. */
if (has_init) return 0;
else has_init = 1;
/* Init some static references to commonly used atoms. */
ATOM_EAGAIN = enif_make_atom(env, "eagain");
ATOM_ENOMEM = enif_make_atom(env, "enomem");
ATOM_ENQUEUED = enif_make_atom(env, "enqueued");
ATOM_ERROR = enif_make_atom(env, "error");
ATOM_OK = enif_make_atom(env, "ok");
ATOM_SHUTDOWN = enif_make_atom(env, "shutdown");
/* Find out how many schedulers there are. */
enif_system_info(&info, sizeof(ErlNifSysInfo));
/* Size the number of work queues according to schedulers. */
if (info.scheduler_threads > ASYNC_NIF_MAX_WORKERS / 2) {
num_queues = ASYNC_NIF_MAX_WORKERS / 2;
} else {
int remainder = ASYNC_NIF_MAX_WORKERS % info.scheduler_threads;
if (remainder != 0)
num_queues = info.scheduler_threads - remainder;
else
num_queues = info.scheduler_threads;
if (num_queues < 2)
num_queues = 2;
}
/* Init our portion of priv_data's module-specific state. */
async_nif = malloc(sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * num_queues);
if (!async_nif)
return NULL;
memset(async_nif, 0, sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * num_queues);
async_nif->num_queues = num_queues;
async_nif->we_active = 0;
async_nif->next_q = 0;
async_nif->shutdown = 0;
STAILQ_INIT(&async_nif->recycled_reqs);
async_nif->recycled_req_mutex = enif_mutex_create("recycled_req");
async_nif->we_mutex = enif_mutex_create("we");
SLIST_INIT(&async_nif->we_joining);
for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i];
STAILQ_INIT(&q->reqs);
q->reqs_mutex = enif_mutex_create("reqs");
q->reqs_cnd = enif_cond_create("reqs");
q->next = &async_nif->queues[(i + 1) % num_queues];
}
return async_nif;
}
static void
async_nif_upgrade(ErlNifEnv *env)
{
UNUSED(env);
// TODO:
}
#if defined(__cplusplus)
}
#endif
#endif // __ASYNC_NIF_H__

View file

@ -1,152 +1,33 @@
#!/bin/bash #!/bin/bash
# /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is.
if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then
POSIX_SHELL="true"
export POSIX_SHELL
exec /usr/bin/ksh $0 $@
fi
unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as well
set -e set -e
WT_REPO=http://github.com/wiredtiger/wiredtiger.git WT_BRANCH=basho
WT_BRANCH=develop
WT_DIR=wiredtiger-`basename $WT_BRANCH`
#WT_REF="tags/1.6.6"
#WT_DIR=wiredtiger-`basename $WT_REF`
SNAPPY_VSN="1.0.4"
SNAPPY_DIR=snappy-$SNAPPY_VSN
[ `basename $PWD` != "c_src" ] && cd c_src [ `basename $PWD` != "c_src" ] && cd c_src
export BASEDIR="$PWD" BASEDIR="$PWD"
which gmake 1>/dev/null 2>/dev/null && MAKE=gmake
MAKE=${MAKE:-make}
export CPPFLAGS="$CPPLAGS -I $BASEDIR/system/include -O3 -mtune=native -march=native"
export LDFLAGS="$LDFLAGS -L$BASEDIR/system/lib"
export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$BASEDIR/system/lib:$LD_LIBRARY_PATH"
get_wt ()
{
if [ -d $BASEDIR/$WT_DIR/.git ]; then
(cd $BASEDIR/$WT_DIR && git pull -u) || exit 1
else
if [ "X$WT_REF" != "X" ]; then
git clone ${WT_REPO} ${WT_DIR} && \
(cd $BASEDIR/$WT_DIR && git checkout refs/$WT_REF || exit 1)
else
git clone ${WT_REPO} ${WT_DIR} && \
(cd $BASEDIR/$WT_DIR && git checkout -b $WT_BRANCH origin/$WT_BRANCH || exit 1)
fi
fi
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
(cd $BASEDIR/$WT_DIR
[ -e $BASEDIR/wiredtiger-build.patch ] && \
(patch -p1 --forward < $BASEDIR/wiredtiger-build.patch || exit 1 )
./autogen.sh || exit 1
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE distclean)
wt_configure;
)
}
wt_configure ()
{
(cd $BASEDIR/$WT_DIR/build_posix
CFLAGS+=-g $BASEDIR/$WT_DIR/configure --with-pic \
--enable-snappy \
--prefix=${BASEDIR}/system || exit 1)
}
get_snappy ()
{
[ -e snappy-$SNAPPY_VSN.tar.gz ] || (echo "Missing Snappy ($SNAPPY_VSN) source package" && exit 1)
[ -d $BASEDIR/$SNAPPY_DIR ] || tar -xzf snappy-$SNAPPY_VSN.tar.gz
[ -e $BASEDIR/snappy-build.patch ] && \
(cd $BASEDIR/$SNAPPY_DIR
patch -p1 --forward < $BASEDIR/snappy-build.patch || exit 1)
(cd $BASEDIR/$SNAPPY_DIR
./configure --with-pic --prefix=$BASEDIR/system || exit 1)
}
get_deps ()
{
get_snappy;
get_wt;
}
update_deps ()
{
if [ -d $BASEDIR/$WT_DIR/.git ]; then
(cd $BASEDIR/$WT_DIR
if [ "X$WT_VSN" == "X" ]; then
git pull -u || exit 1
else
git checkout $WT_VSN || exit 1
fi
)
fi
}
build_wt ()
{
wt_configure;
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE -j && $MAKE install)
}
build_snappy ()
{
(cd $BASEDIR/$SNAPPY_DIR && \
$MAKE -j && \
$MAKE install
)
}
case "$1" in case "$1" in
clean) clean)
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \ rm -rf system wiredtiger
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE clean)
rm -rf system $SNAPPY_DIR
rm -f ${BASEDIR}/../priv/wt
rm -f ${BASEDIR}/../priv/libwiredtiger-*.so
rm -f ${BASEDIR}/../priv/libwiredtiger_*.so
rm -f ${BASEDIR}/../priv/libsnappy.so.*
;;
test)
(cd $BASEDIR/$WT_DIR && $MAKE -j test)
;;
update-deps)
update-deps;
;;
get-deps)
get_deps;
;; ;;
*) *)
shopt -s extglob test -f system/lib/libwiredtiger.a && exit 0
SUFFIXES='@(so|dylib)'
# Build Snappy if [ -d wiredtiger/.git ]; then
[ -d $SNAPPY_DIR ] || get_snappy; (cd wiredtiger && \
[ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory" && exit 1) git fetch && \
test -f $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9].* || build_snappy; git merge origin/$WT_BRANCH)
else
# Build WiredTiger git clone https://github.com/wiredtiger/wiredtiger.git -b $WT_BRANCH && \
[ -d $WT_DIR ] || get_wt; (cd wiredtiger && ./autogen.sh)
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1) fi
test -f $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].${SUFFIXES} -a \ (cd wiredtiger/build_posix && \
-f $BASEDIR/system/lib/libwiredtiger_snappy.${SUFFIXES} || build_wt; ../configure --with-pic \
[ -d $BASEDIR/../priv ] || mkdir ${BASEDIR}/../priv --prefix=$BASEDIR/system && \
cp -p -P $BASEDIR/system/bin/wt ${BASEDIR}/../priv make && make install)
cp -p -P ${BASEDIR}/system/lib/libwiredtiger-[0-9].[0-9].[0-9].${SUFFIXES} ${BASEDIR}/../priv
cp -p -P ${BASEDIR}/system/lib/libwiredtiger_snappy.${SUFFIXES} ${BASEDIR}/../priv
cp -p -P ${BASEDIR}/system/lib/libsnappy.${SUFFIXES}* ${BASEDIR}/../priv
;; ;;
esac esac

View file

@ -1,66 +0,0 @@
/*
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
* Author: Gregory Burd <greg@basho.com> <greg@burd.me>
*
* This file is provided to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef __COMMON_H__
#define __COMMON_H__
#if defined(__cplusplus)
extern "C" {
#endif
#if !(__STDC_VERSION__ >= 199901L || defined(__GNUC__))
# undef DEBUG
# define DEBUG 0
# define DPRINTF (void) /* Vararg macros may be unsupported */
#elif DEBUG
#include <stdio.h>
#include <stdarg.h>
#define DPRINTF(fmt, ...) \
do { \
fprintf(stderr, "%s:%d " fmt "\n", __FILE__, __LINE__, __VA_ARGS__); \
fflush(stderr); \
} while(0)
#define DPUTS(arg) DPRINTF("%s", arg)
#else
#define DPRINTF(fmt, ...) ((void) 0)
#define DPUTS(arg) ((void) 0)
#endif
#ifndef __UNUSED
#define __UNUSED(v) ((void)(v))
#endif
#ifndef COMPQUIET
#define COMPQUIET(n, v) do { \
(n) = (v); \
(n) = (n); \
} while (0)
#endif
#ifdef __APPLE__
#define PRIuint64(x) (x)
#else
#define PRIuint64(x) (unsigned long long)(x)
#endif
#if defined(__cplusplus)
}
#endif
#endif // __COMMON_H__

View file

@ -1,678 +0,0 @@
/*
* Copyright (c) 1991, 1993
* The Regents of the University of California. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 4. Neither the name of the University nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* @(#)queue.h 8.5 (Berkeley) 8/20/94
* $FreeBSD: src/sys/sys/queue.h,v 1.54 2002/08/05 05:18:43 alfred Exp $
*/
#ifndef _DB_QUEUE_H_
#define _DB_QUEUE_H_
#ifndef __offsetof
#define __offsetof(st, m) \
((size_t) ( (char *)&((st *)0)->m - (char *)0 ))
#endif
#ifndef __containerof
#define __containerof(ptr, type, member) ({ \
const typeof( ((type *)0)->member ) *__mptr = (ptr); \
(type *)( (char *)__mptr - __offsetof(type,member) );})
#endif
#if defined(__cplusplus)
extern "C" {
#endif
/*
* This file defines four types of data structures: singly-linked lists,
* singly-linked tail queues, lists and tail queues.
*
* A singly-linked list is headed by a single forward pointer. The elements
* are singly linked for minimum space and pointer manipulation overhead at
* the expense of O(n) removal for arbitrary elements. New elements can be
* added to the list after an existing element or at the head of the list.
* Elements being removed from the head of the list should use the explicit
* macro for this purpose for optimum efficiency. A singly-linked list may
* only be traversed in the forward direction. Singly-linked lists are ideal
* for applications with large datasets and few or no removals or for
* implementing a LIFO queue.
*
* A singly-linked tail queue is headed by a pair of pointers, one to the
* head of the list and the other to the tail of the list. The elements are
* singly linked for minimum space and pointer manipulation overhead at the
* expense of O(n) removal for arbitrary elements. New elements can be added
* to the list after an existing element, at the head of the list, or at the
* end of the list. Elements being removed from the head of the tail queue
* should use the explicit macro for this purpose for optimum efficiency.
* A singly-linked tail queue may only be traversed in the forward direction.
* Singly-linked tail queues are ideal for applications with large datasets
* and few or no removals or for implementing a FIFO queue.
*
* A list is headed by a single forward pointer (or an array of forward
* pointers for a hash table header). The elements are doubly linked
* so that an arbitrary element can be removed without a need to
* traverse the list. New elements can be added to the list before
* or after an existing element or at the head of the list. A list
* may only be traversed in the forward direction.
*
* A tail queue is headed by a pair of pointers, one to the head of the
* list and the other to the tail of the list. The elements are doubly
* linked so that an arbitrary element can be removed without a need to
* traverse the list. New elements can be added to the list before or
* after an existing element, at the head of the list, or at the end of
* the list. A tail queue may be traversed in either direction.
*
* For details on the use of these macros, see the queue(3) manual page.
*
*
* SLIST LIST STAILQ TAILQ
* _HEAD + + + +
* _HEAD_INITIALIZER + + + +
* _ENTRY + + + +
* _INIT + + + +
* _EMPTY + + + +
* _FIRST + + + +
* _NEXT + + + +
* _PREV - - - +
* _LAST - - + +
* _FOREACH + + + +
* _FOREACH_REVERSE - - - +
* _INSERT_HEAD + + + +
* _INSERT_BEFORE - + - +
* _INSERT_AFTER + + + +
* _INSERT_TAIL - - + +
* _CONCAT - - + +
* _REMOVE_HEAD + - + -
* _REMOVE + + + +
*
*/
/*
* XXX
* We #undef all of the macros because there are incompatible versions of this
* file and these macros on various systems. What makes the problem worse is
* they are included and/or defined by system include files which we may have
* already loaded into Berkeley DB before getting here. For example, FreeBSD's
* <rpc/rpc.h> includes its system <sys/queue.h>, and VxWorks UnixLib.h defines
* several of the LIST_XXX macros. Visual C.NET 7.0 also defines some of these
* same macros in Vc7\PlatformSDK\Include\WinNT.h. Make sure we use ours.
*/
#undef LIST_EMPTY
#undef LIST_ENTRY
#undef LIST_FIRST
#undef LIST_FOREACH
#undef LIST_HEAD
#undef LIST_HEAD_INITIALIZER
#undef LIST_INIT
#undef LIST_INSERT_AFTER
#undef LIST_INSERT_BEFORE
#undef LIST_INSERT_HEAD
#undef LIST_NEXT
#undef LIST_REMOVE
#undef QMD_TRACE_ELEM
#undef QMD_TRACE_HEAD
#undef QUEUE_MACRO_DEBUG
#undef SLIST_EMPTY
#undef SLIST_ENTRY
#undef SLIST_FIRST
#undef SLIST_FOREACH
#undef SLIST_FOREACH_PREVPTR
#undef SLIST_HEAD
#undef SLIST_HEAD_INITIALIZER
#undef SLIST_INIT
#undef SLIST_INSERT_AFTER
#undef SLIST_INSERT_HEAD
#undef SLIST_NEXT
#undef SLIST_REMOVE
#undef SLIST_REMOVE_HEAD
#undef STAILQ_CONCAT
#undef STAILQ_EMPTY
#undef STAILQ_ENTRY
#undef STAILQ_FIRST
#undef STAILQ_FOREACH
#undef STAILQ_HEAD
#undef STAILQ_HEAD_INITIALIZER
#undef STAILQ_INIT
#undef STAILQ_INSERT_AFTER
#undef STAILQ_INSERT_HEAD
#undef STAILQ_INSERT_TAIL
#undef STAILQ_LAST
#undef STAILQ_NEXT
#undef STAILQ_REMOVE
#undef STAILQ_REMOVE_HEAD
#undef STAILQ_REMOVE_HEAD_UNTIL
#undef TAILQ_CONCAT
#undef TAILQ_EMPTY
#undef TAILQ_ENTRY
#undef TAILQ_FIRST
#undef TAILQ_FOREACH
#undef TAILQ_FOREACH_REVERSE
#undef TAILQ_HEAD
#undef TAILQ_HEAD_INITIALIZER
#undef TAILQ_INIT
#undef TAILQ_INSERT_AFTER
#undef TAILQ_INSERT_BEFORE
#undef TAILQ_INSERT_HEAD
#undef TAILQ_INSERT_TAIL
#undef TAILQ_LAST
#undef TAILQ_NEXT
#undef TAILQ_PREV
#undef TAILQ_REMOVE
#undef TRACEBUF
#undef TRASHIT
#define QUEUE_MACRO_DEBUG 0
#if QUEUE_MACRO_DEBUG
/* Store the last 2 places the queue element or head was altered */
struct qm_trace {
char * lastfile;
int lastline;
char * prevfile;
int prevline;
};
#define TRACEBUF struct qm_trace trace;
#define TRASHIT(x) do {(x) = (void *)-1;} while (0)
#define QMD_TRACE_HEAD(head) do { \
(head)->trace.prevline = (head)->trace.lastline; \
(head)->trace.prevfile = (head)->trace.lastfile; \
(head)->trace.lastline = __LINE__; \
(head)->trace.lastfile = __FILE__; \
} while (0)
#define QMD_TRACE_ELEM(elem) do { \
(elem)->trace.prevline = (elem)->trace.lastline; \
(elem)->trace.prevfile = (elem)->trace.lastfile; \
(elem)->trace.lastline = __LINE__; \
(elem)->trace.lastfile = __FILE__; \
} while (0)
#else
#define QMD_TRACE_ELEM(elem)
#define QMD_TRACE_HEAD(head)
#define TRACEBUF
#define TRASHIT(x)
#endif /* QUEUE_MACRO_DEBUG */
/*
* Singly-linked List declarations.
*/
#define SLIST_HEAD(name, type) \
struct name { \
struct type *slh_first; /* first element */ \
}
#define SLIST_HEAD_INITIALIZER(head) \
{ NULL }
#define SLIST_ENTRY(type) \
struct { \
struct type *sle_next; /* next element */ \
}
/*
* Singly-linked List functions.
*/
#define SLIST_EMPTY(head) ((head)->slh_first == NULL)
#define SLIST_FIRST(head) ((head)->slh_first)
#define SLIST_FOREACH(var, head, field) \
for ((var) = SLIST_FIRST((head)); \
(var); \
(var) = SLIST_NEXT((var), field))
#define SLIST_FOREACH_PREVPTR(var, varp, head, field) \
for ((varp) = &SLIST_FIRST((head)); \
((var) = *(varp)) != NULL; \
(varp) = &SLIST_NEXT((var), field))
#define SLIST_INIT(head) do { \
SLIST_FIRST((head)) = NULL; \
} while (0)
#define SLIST_INSERT_AFTER(slistelm, elm, field) do { \
SLIST_NEXT((elm), field) = SLIST_NEXT((slistelm), field); \
SLIST_NEXT((slistelm), field) = (elm); \
} while (0)
#define SLIST_INSERT_HEAD(head, elm, field) do { \
SLIST_NEXT((elm), field) = SLIST_FIRST((head)); \
SLIST_FIRST((head)) = (elm); \
} while (0)
#define SLIST_NEXT(elm, field) ((elm)->field.sle_next)
#define SLIST_REMOVE(head, elm, type, field) do { \
if (SLIST_FIRST((head)) == (elm)) { \
SLIST_REMOVE_HEAD((head), field); \
} \
else { \
struct type *curelm = SLIST_FIRST((head)); \
while (SLIST_NEXT(curelm, field) != (elm)) \
curelm = SLIST_NEXT(curelm, field); \
SLIST_NEXT(curelm, field) = \
SLIST_NEXT(SLIST_NEXT(curelm, field), field); \
} \
} while (0)
#define SLIST_REMOVE_HEAD(head, field) do { \
SLIST_FIRST((head)) = SLIST_NEXT(SLIST_FIRST((head)), field); \
} while (0)
/*
* Singly-linked Tail queue declarations.
*/
#define STAILQ_HEAD(name, type) \
struct name { \
struct type *stqh_first;/* first element */ \
struct type **stqh_last;/* addr of last next element */ \
}
#define STAILQ_HEAD_INITIALIZER(head) \
{ NULL, &(head).stqh_first }
#define STAILQ_ENTRY(type) \
struct { \
struct type *stqe_next; /* next element */ \
}
/*
* Singly-linked Tail queue functions.
*/
#define STAILQ_CONCAT(head1, head2) do { \
if (!STAILQ_EMPTY((head2))) { \
*(head1)->stqh_last = (head2)->stqh_first; \
(head1)->stqh_last = (head2)->stqh_last; \
STAILQ_INIT((head2)); \
} \
} while (0)
#define STAILQ_EMPTY(head) ((head)->stqh_first == NULL)
#define STAILQ_FIRST(head) ((head)->stqh_first)
#define STAILQ_FOREACH(var, head, field) \
for ((var) = STAILQ_FIRST((head)); \
(var); \
(var) = STAILQ_NEXT((var), field))
#define STAILQ_INIT(head) do { \
STAILQ_FIRST((head)) = NULL; \
(head)->stqh_last = &STAILQ_FIRST((head)); \
} while (0)
#define STAILQ_INSERT_AFTER(head, tqelm, elm, field) do { \
if ((STAILQ_NEXT((elm), field) = STAILQ_NEXT((tqelm), field)) == NULL)\
(head)->stqh_last = &STAILQ_NEXT((elm), field); \
STAILQ_NEXT((tqelm), field) = (elm); \
} while (0)
#define STAILQ_INSERT_HEAD(head, elm, field) do { \
if ((STAILQ_NEXT((elm), field) = STAILQ_FIRST((head))) == NULL) \
(head)->stqh_last = &STAILQ_NEXT((elm), field); \
STAILQ_FIRST((head)) = (elm); \
} while (0)
#define STAILQ_INSERT_TAIL(head, elm, field) do { \
STAILQ_NEXT((elm), field) = NULL; \
*(head)->stqh_last = (elm); \
(head)->stqh_last = &STAILQ_NEXT((elm), field); \
} while (0)
#define STAILQ_LAST(head, type, field) \
(STAILQ_EMPTY((head)) ? \
NULL : \
((struct type *) \
((char *)((head)->stqh_last) - __offsetof(struct type, field))))
#define STAILQ_NEXT(elm, field) ((elm)->field.stqe_next)
#define STAILQ_REMOVE(head, elm, type, field) do { \
if (STAILQ_FIRST((head)) == (elm)) { \
STAILQ_REMOVE_HEAD((head), field); \
} \
else { \
struct type *curelm = STAILQ_FIRST((head)); \
while (STAILQ_NEXT(curelm, field) != (elm)) \
curelm = STAILQ_NEXT(curelm, field); \
if ((STAILQ_NEXT(curelm, field) = \
STAILQ_NEXT(STAILQ_NEXT(curelm, field), field)) == NULL)\
(head)->stqh_last = &STAILQ_NEXT((curelm), field);\
} \
} while (0)
#define STAILQ_REMOVE_HEAD(head, field) do { \
if ((STAILQ_FIRST((head)) = \
STAILQ_NEXT(STAILQ_FIRST((head)), field)) == NULL) \
(head)->stqh_last = &STAILQ_FIRST((head)); \
} while (0)
#define STAILQ_REMOVE_HEAD_UNTIL(head, elm, field) do { \
if ((STAILQ_FIRST((head)) = STAILQ_NEXT((elm), field)) == NULL) \
(head)->stqh_last = &STAILQ_FIRST((head)); \
} while (0)
/*
* List declarations.
*/
#define LIST_HEAD(name, type) \
struct name { \
struct type *lh_first; /* first element */ \
}
#define LIST_HEAD_INITIALIZER(head) \
{ NULL }
#define LIST_ENTRY(type) \
struct { \
struct type *le_next; /* next element */ \
struct type **le_prev; /* address of previous next element */ \
}
/*
* List functions.
*/
#define LIST_EMPTY(head) ((head)->lh_first == NULL)
#define LIST_FIRST(head) ((head)->lh_first)
#define LIST_FOREACH(var, head, field) \
for ((var) = LIST_FIRST((head)); \
(var); \
(var) = LIST_NEXT((var), field))
#define LIST_INIT(head) do { \
LIST_FIRST((head)) = NULL; \
} while (0)
#define LIST_INSERT_AFTER(listelm, elm, field) do { \
if ((LIST_NEXT((elm), field) = LIST_NEXT((listelm), field)) != NULL)\
LIST_NEXT((listelm), field)->field.le_prev = \
&LIST_NEXT((elm), field); \
LIST_NEXT((listelm), field) = (elm); \
(elm)->field.le_prev = &LIST_NEXT((listelm), field); \
} while (0)
#define LIST_INSERT_BEFORE(listelm, elm, field) do { \
(elm)->field.le_prev = (listelm)->field.le_prev; \
LIST_NEXT((elm), field) = (listelm); \
*(listelm)->field.le_prev = (elm); \
(listelm)->field.le_prev = &LIST_NEXT((elm), field); \
} while (0)
#define LIST_INSERT_HEAD(head, elm, field) do { \
if ((LIST_NEXT((elm), field) = LIST_FIRST((head))) != NULL) \
LIST_FIRST((head))->field.le_prev = &LIST_NEXT((elm), field);\
LIST_FIRST((head)) = (elm); \
(elm)->field.le_prev = &LIST_FIRST((head)); \
} while (0)
#define LIST_NEXT(elm, field) ((elm)->field.le_next)
#define LIST_REMOVE(elm, field) do { \
if (LIST_NEXT((elm), field) != NULL) \
LIST_NEXT((elm), field)->field.le_prev = \
(elm)->field.le_prev; \
*(elm)->field.le_prev = LIST_NEXT((elm), field); \
} while (0)
/*
* Tail queue declarations.
*/
#define TAILQ_HEAD(name, type) \
struct name { \
struct type *tqh_first; /* first element */ \
struct type **tqh_last; /* addr of last next element */ \
TRACEBUF \
}
#define TAILQ_HEAD_INITIALIZER(head) \
{ NULL, &(head).tqh_first }
#define TAILQ_ENTRY(type) \
struct { \
struct type *tqe_next; /* next element */ \
struct type **tqe_prev; /* address of previous next element */ \
TRACEBUF \
}
/*
* Tail queue functions.
*/
#define TAILQ_CONCAT(head1, head2, field) do { \
if (!TAILQ_EMPTY(head2)) { \
*(head1)->tqh_last = (head2)->tqh_first; \
(head2)->tqh_first->field.tqe_prev = (head1)->tqh_last; \
(head1)->tqh_last = (head2)->tqh_last; \
TAILQ_INIT((head2)); \
QMD_TRACE_HEAD(head); \
QMD_TRACE_HEAD(head2); \
} \
} while (0)
#define TAILQ_EMPTY(head) ((head)->tqh_first == NULL)
#define TAILQ_FIRST(head) ((head)->tqh_first)
#define TAILQ_FOREACH(var, head, field) \
for ((var) = TAILQ_FIRST((head)); \
(var); \
(var) = TAILQ_NEXT((var), field))
#define TAILQ_FOREACH_REVERSE(var, head, headname, field) \
for ((var) = TAILQ_LAST((head), headname); \
(var); \
(var) = TAILQ_PREV((var), headname, field))
#define TAILQ_INIT(head) do { \
TAILQ_FIRST((head)) = NULL; \
(head)->tqh_last = &TAILQ_FIRST((head)); \
QMD_TRACE_HEAD(head); \
} while (0)
#define TAILQ_INSERT_AFTER(head, listelm, elm, field) do { \
if ((TAILQ_NEXT((elm), field) = TAILQ_NEXT((listelm), field)) != NULL)\
TAILQ_NEXT((elm), field)->field.tqe_prev = \
&TAILQ_NEXT((elm), field); \
else { \
(head)->tqh_last = &TAILQ_NEXT((elm), field); \
QMD_TRACE_HEAD(head); \
} \
TAILQ_NEXT((listelm), field) = (elm); \
(elm)->field.tqe_prev = &TAILQ_NEXT((listelm), field); \
QMD_TRACE_ELEM(&(elm)->field); \
QMD_TRACE_ELEM(&listelm->field); \
} while (0)
#define TAILQ_INSERT_BEFORE(listelm, elm, field) do { \
(elm)->field.tqe_prev = (listelm)->field.tqe_prev; \
TAILQ_NEXT((elm), field) = (listelm); \
*(listelm)->field.tqe_prev = (elm); \
(listelm)->field.tqe_prev = &TAILQ_NEXT((elm), field); \
QMD_TRACE_ELEM(&(elm)->field); \
QMD_TRACE_ELEM(&listelm->field); \
} while (0)
#define TAILQ_INSERT_HEAD(head, elm, field) do { \
if ((TAILQ_NEXT((elm), field) = TAILQ_FIRST((head))) != NULL) \
TAILQ_FIRST((head))->field.tqe_prev = \
&TAILQ_NEXT((elm), field); \
else \
(head)->tqh_last = &TAILQ_NEXT((elm), field); \
TAILQ_FIRST((head)) = (elm); \
(elm)->field.tqe_prev = &TAILQ_FIRST((head)); \
QMD_TRACE_HEAD(head); \
QMD_TRACE_ELEM(&(elm)->field); \
} while (0)
#define TAILQ_INSERT_TAIL(head, elm, field) do { \
TAILQ_NEXT((elm), field) = NULL; \
(elm)->field.tqe_prev = (head)->tqh_last; \
*(head)->tqh_last = (elm); \
(head)->tqh_last = &TAILQ_NEXT((elm), field); \
QMD_TRACE_HEAD(head); \
QMD_TRACE_ELEM(&(elm)->field); \
} while (0)
#define TAILQ_LAST(head, headname) \
(*(((struct headname *)((head)->tqh_last))->tqh_last))
#define TAILQ_NEXT(elm, field) ((elm)->field.tqe_next)
#define TAILQ_PREV(elm, headname, field) \
(*(((struct headname *)((elm)->field.tqe_prev))->tqh_last))
#define TAILQ_REMOVE(head, elm, field) do { \
if ((TAILQ_NEXT((elm), field)) != NULL) \
TAILQ_NEXT((elm), field)->field.tqe_prev = \
(elm)->field.tqe_prev; \
else { \
(head)->tqh_last = (elm)->field.tqe_prev; \
QMD_TRACE_HEAD(head); \
} \
*(elm)->field.tqe_prev = TAILQ_NEXT((elm), field); \
TRASHIT((elm)->field.tqe_next); \
TRASHIT((elm)->field.tqe_prev); \
QMD_TRACE_ELEM(&(elm)->field); \
} while (0)
/*
* Circular queue definitions.
*/
#define CIRCLEQ_HEAD(name, type) \
struct name { \
struct type *cqh_first; /* first element */ \
struct type *cqh_last; /* last element */ \
}
#define CIRCLEQ_HEAD_INITIALIZER(head) \
{ (void *)&head, (void *)&head }
#define CIRCLEQ_ENTRY(type) \
struct { \
struct type *cqe_next; /* next element */ \
struct type *cqe_prev; /* previous element */ \
}
/*
* Circular queue functions.
*/
#define CIRCLEQ_INIT(head) do { \
(head)->cqh_first = (void *)(head); \
(head)->cqh_last = (void *)(head); \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_INSERT_AFTER(head, listelm, elm, field) do { \
(elm)->field.cqe_next = (listelm)->field.cqe_next; \
(elm)->field.cqe_prev = (listelm); \
if ((listelm)->field.cqe_next == (void *)(head)) \
(head)->cqh_last = (elm); \
else \
(listelm)->field.cqe_next->field.cqe_prev = (elm); \
(listelm)->field.cqe_next = (elm); \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_INSERT_BEFORE(head, listelm, elm, field) do { \
(elm)->field.cqe_next = (listelm); \
(elm)->field.cqe_prev = (listelm)->field.cqe_prev; \
if ((listelm)->field.cqe_prev == (void *)(head)) \
(head)->cqh_first = (elm); \
else \
(listelm)->field.cqe_prev->field.cqe_next = (elm); \
(listelm)->field.cqe_prev = (elm); \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_INSERT_HEAD(head, elm, field) do { \
(elm)->field.cqe_next = (head)->cqh_first; \
(elm)->field.cqe_prev = (void *)(head); \
if ((head)->cqh_last == (void *)(head)) \
(head)->cqh_last = (elm); \
else \
(head)->cqh_first->field.cqe_prev = (elm); \
(head)->cqh_first = (elm); \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_INSERT_TAIL(head, elm, field) do { \
(elm)->field.cqe_next = (void *)(head); \
(elm)->field.cqe_prev = (head)->cqh_last; \
if ((head)->cqh_first == (void *)(head)) \
(head)->cqh_first = (elm); \
else \
(head)->cqh_last->field.cqe_next = (elm); \
(head)->cqh_last = (elm); \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_REMOVE(head, elm, field) do { \
if ((elm)->field.cqe_next == (void *)(head)) \
(head)->cqh_last = (elm)->field.cqe_prev; \
else \
(elm)->field.cqe_next->field.cqe_prev = \
(elm)->field.cqe_prev; \
if ((elm)->field.cqe_prev == (void *)(head)) \
(head)->cqh_first = (elm)->field.cqe_next; \
else \
(elm)->field.cqe_prev->field.cqe_next = \
(elm)->field.cqe_next; \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_FOREACH(var, head, field) \
for ((var) = ((head)->cqh_first); \
(var) != (const void *)(head); \
(var) = ((var)->field.cqe_next))
#define CIRCLEQ_FOREACH_REVERSE(var, head, field) \
for ((var) = ((head)->cqh_last); \
(var) != (const void *)(head); \
(var) = ((var)->field.cqe_prev))
/*
* Circular queue access methods.
*/
#define CIRCLEQ_EMPTY(head) ((head)->cqh_first == (void *)(head))
#define CIRCLEQ_FIRST(head) ((head)->cqh_first)
#define CIRCLEQ_LAST(head) ((head)->cqh_last)
#define CIRCLEQ_NEXT(elm, field) ((elm)->field.cqe_next)
#define CIRCLEQ_PREV(elm, field) ((elm)->field.cqe_prev)
#define CIRCLEQ_LOOP_NEXT(head, elm, field) \
(((elm)->field.cqe_next == (void *)(head)) \
? ((head)->cqh_first) \
: (elm->field.cqe_next))
#define CIRCLEQ_LOOP_PREV(head, elm, field) \
(((elm)->field.cqe_prev == (void *)(head)) \
? ((head)->cqh_last) \
: (elm->field.cqe_prev))
#if defined(__cplusplus)
}
#endif
#endif /* !_DB_QUEUE_H_ */

Binary file not shown.

View file

@ -1,12 +0,0 @@
diff --git a/ext/compressors/snappy/Makefile.am b/ext/compressors/snappy/Makefile.am
index 6d78823..c423590 100644
--- a/ext/compressors/snappy/Makefile.am
+++ b/ext/compressors/snappy/Makefile.am
@@ -2,5 +2,6 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
lib_LTLIBRARIES = libwiredtiger_snappy.la
libwiredtiger_snappy_la_SOURCES = snappy_compress.c
-libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module
+libwiredtiger_snappy_la_CFLAGS = -I$(abs_top_builddir)/../../system/include
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -L$(abs_top_builddir)/../../system/lib -Wl,-rpath,lib/wterl-0.9.0/priv:lib/wterl/priv:priv
libwiredtiger_snappy_la_LIBADD = -lsnappy

File diff suppressed because it is too large Load diff

View file

@ -35,7 +35,7 @@ fi
rebar get-deps rebar get-deps
file=./deps/riak_kv/src/riak_kv.app.src file=./deps/riak_kv/src/riak_kv.app.src
if ! grep -q wterl $file ; then if ! grep -q hanoidb $file && ! grep -q wterl $file ; then
echo echo
echo "Modifying $file, saving the original as ${file}.orig ..." echo "Modifying $file, saving the original as ${file}.orig ..."
perl -i.orig -pe '/\bos_mon,/ && print qq( wterl,\n)' $file perl -i.orig -pe '/\bos_mon,/ && print qq( wterl,\n)' $file

View file

@ -1,6 +0,0 @@
%%%% This is the WiredTiger section
%% @doc wiredtiger data_root
{mapping, "wiredtiger.data_root", "wterl.data_root", [
{default, "{{platform_data_dir}}/wiredtiger"}
]}.

View file

@ -1,46 +1,13 @@
%%-*- mode: erlang -*- %%-*- mode: erlang -*-
%% ex: ft=erlang ts=4 sw=4 et %% ex: ft=erlang ts=4 sw=4 et
{require_otp_vsn, "R1[567]"}.
{cover_enabled, true}.
%{eunit_opts, [verbose, {report, {eunit_surefire, [{dir, "."}]}}]}.
{erl_opts, [
%native, {hipe, [o3,verbose]}, inline, {inline_size, 1024},
{parse_transform, lager_transform},
debug_info,
{d,'DEBUG',true},
strict_validation,
fail_on_warning,
%warn_missing_spec,
warn_bif_clash,
warn_deprecated_function,
warn_export_all,
warn_export_vars,
warn_exported_vars,
warn_obsolete_guard,
warn_shadow_vars,
warn_untyped_record,
warn_unused_function,
warn_unused_import,
warn_unused_record,
warn_unused_vars
]}.
{xref_checks, [undefined_function_calls, deprecated_function_calls]}.
{deps, [
{lager, "2.*", {git, "git://github.com/basho/lager", {branch, "master"}}}
]}.
{port_specs, [{"priv/wterl.so", ["c_src/*.c"]}]}. {port_specs, [{"priv/wterl.so", ["c_src/*.c"]}]}.
{port_env, [ {port_env, [
{"DRV_CFLAGS", "$DRV_CFLAGS -O3 -mtune=native -march=native -fPIC -Wall -Wextra -Werror -I c_src/system/include"}, {"DRV_CFLAGS", "$DRV_CFLAGS -Werror -I c_src/system/include"},
{"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:lib/wterl-0.9.0/priv:priv -Lc_src/system/lib -lwiredtiger"} {"DRV_LDFLAGS", "$DRV_LDFLAGS c_src/system/lib/libwiredtiger.a"}
]}. ]}.
{pre_hooks, [{compile, "c_src/build_deps.sh compile"}]}. {pre_hooks, [{compile, "c_src/build_deps.sh"}]}.
{post_hooks, [{clean, "c_src/build_deps.sh clean"}]}. {post_hooks, [{clean, "c_src/build_deps.sh clean"}]}.

View file

@ -1,54 +0,0 @@
%% -------------------------------------------------------------------
%%
%% async_nif: An async thread-pool layer for Erlang's NIF API
%%
%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
%% Author: Gregory Burd <greg@basho.com> <greg@burd.me>
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-define(ASYNC_NIF_CALL(Fun, Args),
F = fun(F, T) ->
R = erlang:make_ref(),
case erlang:apply(Fun, [R|Args]) of
{ok, {enqueued, PctBusy}} ->
if
PctBusy > 0.25 andalso PctBusy =< 1.0 ->
erlang:bump_reductions(erlang:trunc(2000 * PctBusy));
true ->
ok
end,
receive
{R, {error, shutdown}=Error} ->
%% Work unit was queued, but not executed.
Error;
{R, {error, _Reason}=Error} ->
%% Work unit returned an error.
Error;
{R, Reply} ->
Reply
end;
{error, eagain} ->
case T of
3 -> not_found;
_ -> F(F, T + 1)
end;
Other ->
Other
end
end,
F(F, 1)).

View file

@ -22,7 +22,7 @@
-module(riak_kv_wterl_backend). -module(riak_kv_wterl_backend).
-behavior(temp_riak_kv_backend). -behavior(temp_riak_kv_backend).
-compile([{parse_transform, lager_transform}]). -author('Steve Vinoski <steve@basho.com>').
%% KV Backend API %% KV Backend API
-export([api_version/0, -export([api_version/0,
@ -43,7 +43,6 @@
-ifdef(TEST). -ifdef(TEST).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-endif. -endif.
-define(API_VERSION, 1). -define(API_VERSION, 1).
@ -51,9 +50,11 @@
%%-define(CAPABILITIES, [async_fold, indexes]). %%-define(CAPABILITIES, [async_fold, indexes]).
-define(CAPABILITIES, [async_fold]). -define(CAPABILITIES, [async_fold]).
-record(state, {table :: string(), -record(state, {conn :: wterl:connection(),
type :: string(), table :: string(),
connection :: wterl:connection()}). session :: wterl:session(),
cursors :: ets:tid(),
partition :: integer()}).
-type state() :: #state{}. -type state() :: #state{}.
-type config() :: [{atom(), term()}]. -type config() :: [{atom(), term()}].
@ -81,79 +82,88 @@ capabilities(_, _) ->
%% @doc Start the wterl backend %% @doc Start the wterl backend
-spec start(integer(), config()) -> {ok, state()} | {error, term()}. -spec start(integer(), config()) -> {ok, state()} | {error, term()}.
start(Partition, Config) -> start(Partition, Config) ->
AppStart = %% Get the data root directory
case application:start(wterl) of case app_helper:get_prop_or_env(data_root, Config, wterl) of
undefined ->
lager:error("Failed to create wterl dir: data_root is not set"),
{error, data_root_unset};
DataRoot ->
AppStart = case application:start(wterl) of
ok -> ok ->
ok; ok;
{error, {already_started, _}} -> {error, {already_started, _}} ->
ok; ok;
{error, Reason1} -> {error, Reason} ->
lager:error("Failed to start wterl: ~p", [Reason1]), lager:error("Failed to start wterl: ~p", [Reason]),
{error, Reason1} {error, Reason}
end, end,
case AppStart of case AppStart of
ok -> ok ->
Type = ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
case wterl:config_value(type, Config, "lsm") of SessionMax =
{type, "lsm"} -> "lsm"; case app_helper:get_env(riak_core, ring_creation_size) of
{type, "table"} -> "table"; undefined -> 1024;
{type, "btree"} -> "table"; RingSize when RingSize < 512 -> 1024;
{type, BadType} -> RingSize -> RingSize * 2
lager:info("wterl:start ignoring unknown type ~p, using lsm instead", [BadType]),
"lsm";
_ ->
lager:info("wterl:start ignoring mistaken setting defaulting to lsm"),
"lsm"
end, end,
{ok, Connection} = establish_connection(Config, Type), ConnectionOpts = [Config,
Table = Type ++ ":" ++ integer_to_list(Partition), {create, true},
Compressor = {logging, true},
case wterl:config_value(block_compressor, Config, "snappy") of {transactional, true},
{block_compressor, "snappy"}=C -> [C]; {session_max, SessionMax},
{block_compressor, "none"} -> []; {cache_size, "2GB"},
{block_compressor, none} -> []; {sync, false}
{block_compressor, _} -> [{block_compressor, "snappy"}]; %% {verbose,
_ -> [{block_compressor, "snappy"}] %% ["block", "shared_cache", "ckpt", "evict",
end, %% "evictserver", "fileops", "hazard", "lsm",
TableOpts = %% "mutex", "read", "readserver", "reconcile",
case Type of %% "salvage", "verify", "write"]}
"lsm" -> ],
[{internal_page_max, "128K"}, case wterl_conn:open(DataRoot, ConnectionOpts) of
{leaf_page_max, "16K"}, {ok, ConnRef} ->
{lsm, [ Table = "lsm:wt" ++ integer_to_list(Partition),
{bloom_config, [{leaf_page_max, "8MB"}]}, SessionOpenOpts = [{isolation, "snapshot"}],
{bloom_bit_count, 28}, {ok, SRef} = wterl:session_open(ConnRef, wterl:config_to_bin(SessionOpenOpts)),
{bloom_hash_count, 19}, SessionOpts = [%TODO {block_compressor, "snappy"},
{bloom_oldest, true}, {internal_page_max, "128K"},
{chunk_size, "100MB"}, {leaf_page_max, "256K"},
{merge_threads, 2} {lsm_chunk_size, "256MB"},
]} {lsm_bloom_config, [{leaf_page_max, "16MB"}]} ],
] ++ Compressor; ok = wterl:session_create(SRef, Table, wterl:config_to_bin(SessionOpts)),
"table" -> {ok, #state{conn=ConnRef,
Compressor table=Table,
end, session=SRef,
case wterl:create(Connection, Table, TableOpts) of partition=Partition}};
ok -> {error, ConnReason}=ConnError ->
{ok, #state{table=Table, type=Type, lager:error("Failed to start wterl backend: ~p\n",
connection=Connection}}; [ConnReason]),
{error, Reason3} -> ConnError
{error, Reason3} end;
Error ->
Error
end end
end. end.
%% @doc Stop the wterl backend %% @doc Stop the wterl backend
-spec stop(state()) -> ok. -spec stop(state()) -> ok.
stop(_State) -> stop(#state{conn=ConnRef, session=SRef, cursors=undefined}) ->
ok. %% The connection is closed by wterl_conn:stop() ok = wterl:session_close(SRef),
wterl_conn:close(ConnRef);
stop(#state{cursors=Cursors}=State) ->
ets:foldl(fun({_Table, Cursor}, _) ->
ok = wterl:cursor_close(Cursor)
end, true, Cursors),
ets:delete(Cursors),
stop(State#state{cursors=undefined}).
%% @doc Retrieve an object from the wterl backend %% @doc Retrieve an object from the wterl backend
-spec get(riak_object:bucket(), riak_object:key(), state()) -> -spec get(riak_object:bucket(), riak_object:key(), state()) ->
{ok, any(), state()} | {ok, any(), state()} |
{ok, not_found, state()} | {ok, not_found, state()} |
{error, term(), state()}. {error, term(), state()}.
get(Bucket, Key, #state{connection=Connection, table=Table}=State) -> get(Bucket, Key, #state{session=SRef, table=Table}=State) ->
WTKey = to_object_key(Bucket, Key), WTKey = to_object_key(Bucket, Key),
case wterl:get(Connection, Table, WTKey) of case wterl:cursor_search(shared_cursor(SRef, Table, State), WTKey) of
{ok, Value} -> {ok, Value} ->
{ok, Value, State}; {ok, Value, State};
not_found -> not_found ->
@ -170,8 +180,9 @@ get(Bucket, Key, #state{connection=Connection, table=Table}=State) ->
-spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) -> -spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) ->
{ok, state()} | {ok, state()} |
{error, term(), state()}. {error, term(), state()}.
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{connection=Connection, table=Table}=State) -> put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=State) ->
case wterl:put(Connection, Table, to_object_key(Bucket, PrimaryKey), Val) of WTKey = to_object_key(Bucket, PrimaryKey),
case wterl:cursor_insert(shared_cursor(SRef, Table, State), WTKey, Val) of
ok -> ok ->
{ok, State}; {ok, State};
{error, Reason} -> {error, Reason} ->
@ -185,8 +196,9 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{connection=Connection, table=Ta
-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) -> -spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) ->
{ok, state()} | {ok, state()} |
{error, term(), state()}. {error, term(), state()}.
delete(Bucket, Key, _IndexSpecs, #state{connection=Connection, table=Table}=State) -> delete(Bucket, Key, _IndexSpecs, #state{table=Table, session=SRef}=State) ->
case wterl:delete(Connection, Table, to_object_key(Bucket, Key)) of WTKey = to_object_key(Bucket, Key),
case wterl:cursor_remove(shared_cursor(SRef, Table, State), WTKey) of
ok -> ok ->
{ok, State}; {ok, State};
{error, Reason} -> {error, Reason} ->
@ -198,14 +210,12 @@ delete(Bucket, Key, _IndexSpecs, #state{connection=Connection, table=Table}=Stat
any(), any(),
[], [],
state()) -> {ok, any()} | {async, fun()}. state()) -> {ok, any()} | {async, fun()}.
fold_buckets(FoldBucketsFun, Acc, Opts, #state{connection=Connection, table=Table}) -> fold_buckets(FoldBucketsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
FoldFun = fold_buckets_fun(FoldBucketsFun), FoldFun = fold_buckets_fun(FoldBucketsFun),
BucketFolder = BucketFolder =
fun() -> fun() ->
case wterl:cursor_open(Connection, Table) of {ok, SRef} = wterl:session_open(ConnRef),
{error, {enoent, _Message}} -> {ok, Cursor} = wterl:cursor_open(SRef, Table),
Acc;
{ok, Cursor} ->
try try
{FoldResult, _} = {FoldResult, _} =
wterl:fold_keys(Cursor, FoldFun, {Acc, []}), wterl:fold_keys(Cursor, FoldFun, {Acc, []}),
@ -214,8 +224,8 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{connection=Connection, table=Tabl
{break, AccFinal} -> {break, AccFinal} ->
AccFinal AccFinal
after after
ok = wterl:cursor_close(Cursor) ok = wterl:cursor_close(Cursor),
end ok = wterl:session_close(SRef)
end end
end, end,
case lists:member(async_fold, Opts) of case lists:member(async_fold, Opts) of
@ -230,7 +240,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{connection=Connection, table=Tabl
any(), any(),
[{atom(), term()}], [{atom(), term()}],
state()) -> {ok, term()} | {async, fun()}. state()) -> {ok, term()} | {async, fun()}.
fold_keys(FoldKeysFun, Acc, Opts, #state{connection=Connection, table=Table}) -> fold_keys(FoldKeysFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
%% Figure out how we should limit the fold: by bucket, by %% Figure out how we should limit the fold: by bucket, by
%% secondary index, or neither (fold across everything.) %% secondary index, or neither (fold across everything.)
Bucket = lists:keyfind(bucket, 1, Opts), Bucket = lists:keyfind(bucket, 1, Opts),
@ -247,18 +257,16 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{connection=Connection, table=Table}) ->
FoldFun = fold_keys_fun(FoldKeysFun, Limiter), FoldFun = fold_keys_fun(FoldKeysFun, Limiter),
KeyFolder = KeyFolder =
fun() -> fun() ->
case wterl:cursor_open(Connection, Table) of {ok, SRef} = wterl:session_open(ConnRef),
{error, {enoent, _Message}} -> {ok, Cursor} = wterl:cursor_open(SRef, Table),
Acc;
{ok, Cursor} ->
try try
wterl:fold_keys(Cursor, FoldFun, Acc) wterl:fold_keys(Cursor, FoldFun, Acc)
catch catch
{break, AccFinal} -> {break, AccFinal} ->
AccFinal AccFinal
after after
ok = wterl:cursor_close(Cursor) ok = wterl:cursor_close(Cursor),
end ok = wterl:session_close(SRef)
end end
end, end,
case lists:member(async_fold, Opts) of case lists:member(async_fold, Opts) of
@ -273,30 +281,21 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{connection=Connection, table=Table}) ->
any(), any(),
[{atom(), term()}], [{atom(), term()}],
state()) -> {ok, any()} | {async, fun()}. state()) -> {ok, any()} | {async, fun()}.
fold_objects(FoldObjectsFun, Acc, Opts, #state{connection=Connection, table=Table}) -> fold_objects(FoldObjectsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
Bucket = proplists:get_value(bucket, Opts), Bucket = proplists:get_value(bucket, Opts),
FoldFun = fold_objects_fun(FoldObjectsFun, Bucket), FoldFun = fold_objects_fun(FoldObjectsFun, Bucket),
ObjectFolder = ObjectFolder =
fun() -> fun() ->
case wterl:cursor_open(Connection, Table) of {ok, SRef} = wterl:session_open(ConnRef),
{error, {enoent, _Message}} -> {ok, Cursor} = wterl:cursor_open(SRef, Table),
Acc;
{ok, Cursor} ->
try try
wterl:fold(Cursor, FoldFun, Acc) wterl:fold(Cursor, FoldFun, Acc)
catch catch
{break, AccFinal} -> {break, AccFinal} ->
AccFinal AccFinal
after after
case wterl:cursor_close(Cursor) of ok = wterl:cursor_close(Cursor),
ok -> ok = wterl:session_close(SRef)
ok;
{error, {eperm, _}} -> %% TODO: review/fix
ok;
{error, _}=E ->
E
end
end
end end
end, end,
case lists:member(async_fold, Opts) of case lists:member(async_fold, Opts) of
@ -308,12 +307,10 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{connection=Connection, table=Tabl
%% @doc Delete all objects from this wterl backend %% @doc Delete all objects from this wterl backend
-spec drop(state()) -> {ok, state()} | {error, term(), state()}. -spec drop(state()) -> {ok, state()} | {error, term(), state()}.
drop(#state{connection=Connection, table=Table}=State) -> drop(#state{table=Table, session=SRef}=State) ->
case wterl:drop(Connection, Table) of case wterl:session_truncate(SRef, Table) of
ok -> ok ->
{ok, State}; {ok, State};
{error, {ebusy, _}} -> %% TODO: review/fix
{ok, State};
Error -> Error ->
{error, Error, State} {error, Error, State}
end. end.
@ -321,44 +318,24 @@ drop(#state{connection=Connection, table=Table}=State) ->
%% @doc Returns true if this wterl backend contains any %% @doc Returns true if this wterl backend contains any
%% non-tombstone values; otherwise returns false. %% non-tombstone values; otherwise returns false.
-spec is_empty(state()) -> boolean(). -spec is_empty(state()) -> boolean().
is_empty(#state{connection=Connection, table=Table}) -> is_empty(#state{table=Table, session=SRef}) ->
case wterl:cursor_open(Connection, Table) of {ok, Cursor} = wterl:cursor_open(SRef, Table),
{ok, Cursor} -> try
IsEmpty = not_found =:= wterl:cursor_next(Cursor)
case wterl:cursor_next(Cursor) of after
not_found -> ok = wterl:cursor_close(Cursor)
true;
{error, {eperm, _}} ->
false; % TODO: review/fix this logic
_ ->
false
end,
wterl:cursor_close(Cursor),
IsEmpty;
{error, Reason2} ->
{error, Reason2}
end. end.
%% @doc Get the status information for this wterl backend %% @doc Get the status information for this wterl backend
-spec status(state()) -> [{atom(), term()}]. -spec status(state()) -> [{atom(), term()}].
status(#state{connection=Connection, table=Table}) -> status(#state{table=Table, session=SRef}) ->
[]. {ok, Cursor} = wterl:cursor_open(SRef, "statistics:"++Table),
%% case wterl:cursor_open(Connection, "statistics:" ++ Table, [{statistics_fast, true}]) of try
%% {ok, Cursor} -> Stats = fetch_status(Cursor),
%% TheStats = [{stats, Stats}]
%% case fetch_status(Cursor) of after
%% {ok, Stats} -> ok = wterl:cursor_close(Cursor)
%% Stats; end.
%% {error, {eperm, _}} -> % TODO: review/fix this logic
%% {ok, []};
%% _ ->
%% {ok, []}
%% end,
%% wterl:cursor_close(Cursor),
%% TheStats;
%% {error, Reason2} ->
%% {error, Reason2}
%% end.
%% @doc Register an asynchronous callback %% @doc Register an asynchronous callback
-spec callback(reference(), any(), state()) -> {ok, state()}. -spec callback(reference(), any(), state()) -> {ok, state()}.
@ -370,85 +347,20 @@ callback(_Ref, _Msg, State) ->
%% Internal functions %% Internal functions
%% =================================================================== %% ===================================================================
%% @private shared_cursor(SRef, Table, #state{cursors=undefined}=State) ->
max_sessions(Config) -> Cursors = ets:new(?MODULE, []),
RingSize = shared_cursor(SRef, Table, State#state{cursors=Cursors});
case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of shared_cursor(SRef, Table, #state{cursors=Cursors}=State) ->
undefined -> 1024; case ets:lookup(Cursors, Table) of
Size -> Size [{Table, Cursor}] ->
end, {Cursor, State};
Est = RingSize * erlang:system_info(schedulers),
case Est > 8192 of
true ->
8192;
false ->
case Est < 1024 of
true ->
1024;
false ->
Est
end
end.
%% @private
establish_connection(Config, Type) ->
%% Get the data root directory
case app_helper:get_prop_or_env(data_root, Config, wterl) of
undefined ->
lager:error("Failed to create wterl dir: data_root is not set"),
{error, data_root_unset};
DataRoot ->
ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
%% WT Connection Options:
LogSetting = app_helper:get_prop_or_env(log, Config, wterl, false),
CheckpointSetting =
case Type =:= "lsm" of
true ->
case LogSetting of
true ->
%% Turn checkpoints on if logging is on, checkpoints enable log archival.
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 30}]); % in seconds
_ -> _ ->
[] Cursor = wterl:cursor_open(SRef, Table),
end; ets:insert(Cursors, {Table, Cursor}),
false -> {Cursor, State}
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 30}])
end,
RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl),
ConnectionOpts =
orddict:from_list(
[ wterl:config_value(create, Config, true),
wterl:config_value(checkpoint_sync, Config, false),
wterl:config_value(transaction_sync, Config, "none"),
wterl:config_value(log, Config, [{enabled, LogSetting}]),
wterl:config_value(mmap, Config, false),
wterl:config_value(checkpoint, Config, CheckpointSetting),
wterl:config_value(session_max, Config, max_sessions(Config)),
wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)),
wterl:config_value(statistics, Config, [ "fast", "clear"]),
wterl:config_value(statistics_log, Config, [{wait, 600}]), % in seconds
wterl:config_value(verbose, Config, [ "salvage", "verify"
% Note: for some unknown reason, if you add these additional
% verbose flags Erlang SEGV's "size_object: bad tag for 0x80"
% no idea why... you've been warned.
%"block", "shared_cache", "reconcile", "evict", "lsm",
%"fileops", "read", "write", "readserver", "evictserver",
%"hazard", "mutex", "ckpt"
]) ] ++ proplists:get_value(wterl, Config, [])), % sec
%% WT Session Options:
SessionOpts = [{isolation, "snapshot"}],
case wterl_conn:open(DataRoot, ConnectionOpts, SessionOpts) of
{ok, Connection} ->
{ok, Connection};
{error, Reason2} ->
lager:error("Failed to establish a WiredTiger connection, wterl backend unable to start: ~p\n", [Reason2]),
{error, Reason2}
end
end. end.
%% @private %% @private
%% Return a function to fold over the buckets on this backend %% Return a function to fold over the buckets on this backend
fold_buckets_fun(FoldBucketsFun) -> fold_buckets_fun(FoldBucketsFun) ->
@ -556,53 +468,13 @@ from_index_key(LKey) ->
%% @private %% @private
%% Return all status from wterl statistics cursor %% Return all status from wterl statistics cursor
%% fetch_status(Cursor) -> fetch_status(Cursor) ->
%% {ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}. fetch_status(Cursor, wterl:cursor_next_value(Cursor), []).
%% fetch_status(_Cursor, {error, _}, Acc) -> fetch_status(_Cursor, not_found, Acc) ->
%% lists:reverse(Acc); lists:reverse(Acc);
%% fetch_status(_Cursor, not_found, Acc) -> fetch_status(Cursor, {ok, Stat}, Acc) ->
%% lists:reverse(Acc); [What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])],
%% fetch_status(Cursor, {ok, Stat}, Acc) -> fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]).
%% [What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])],
%% fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]).
size_cache(RequestedSize) ->
Size =
case RequestedSize of
undefined ->
RunningApps = application:which_applications(),
FinalGuess =
case proplists:is_defined(sasl, RunningApps) andalso
proplists:is_defined(os_mon, RunningApps) of
true ->
Memory = memsup:get_system_memory_data(),
TotalRAM = proplists:get_value(system_total_memory, Memory),
FreeRAM = proplists:get_value(free_memory, Memory),
UsedByBeam = proplists:get_value(total, erlang:memory()),
Target = ((TotalRAM - UsedByBeam) div 3),
FirstGuess = (Target - (Target rem (1024 * 1024))),
SecondGuess =
case FirstGuess > FreeRAM of
true -> FreeRAM - (FreeRAM rem (1024 * 1024));
_ -> FirstGuess
end,
case SecondGuess < 1073741824 of %% < 1GB?
true -> "1GB";
false ->
ThirdGuess = SecondGuess div (1024 * 1024),
integer_to_list(ThirdGuess) ++ "MB"
end;
false ->
"1GB"
end,
application:set_env(wterl, cache_size, FinalGuess),
FinalGuess;
Value when is_list(Value) ->
Value;
Value when is_number(Value) ->
integer_to_list(Value)
end,
Size.
%% =================================================================== %% ===================================================================
%% EUnit tests %% EUnit tests
@ -610,14 +482,12 @@ size_cache(RequestedSize) ->
-ifdef(TEST). -ifdef(TEST).
simple_test_() -> simple_test_() ->
{ok, CWD} = file:get_cwd(), ?assertCmd("rm -rf test/wterl-backend"),
rmdir:path(filename:join([CWD, "test/wterl-backend"])), %?assertCmd("rm -rf test/wterl-backend"),
application:set_env(wterl, data_root, "test/wterl-backend"), application:set_env(wterl, data_root, "test/wterl-backend"),
temp_riak_kv_backend:standard_test(?MODULE, []). temp_riak_kv_backend:standard_test(?MODULE, []).
custom_config_test_() -> custom_config_test_() ->
{ok, CWD} = file:get_cwd(), ?assertCmd("rm -rf test/wterl-backend"),
rmdir:path(filename:join([CWD, "test/wterl-backend"])), %?assertCmd("rm -rf test/wterl-backend"),
application:set_env(wterl, data_root, ""), application:set_env(wterl, data_root, ""),
temp_riak_kv_backend:standard_test(?MODULE, [{data_root, "test/wterl-backend"}]). temp_riak_kv_backend:standard_test(?MODULE, [{data_root, "test/wterl-backend"}]).

View file

@ -1,26 +0,0 @@
-module(rmdir).
-export([path/1]).
-include_lib("kernel/include/file.hrl").
path(Dir) ->
remove_all_files(".", [Dir]).
remove_all_files(Dir, Files) ->
lists:foreach(fun(File) ->
FilePath = filename:join([Dir, File]),
case file:read_file_info(FilePath) of
{ok, FileInfo} ->
case FileInfo#file_info.type of
directory ->
{ok, DirFiles} = file:list_dir(FilePath),
remove_all_files(FilePath, DirFiles),
file:del_dir(FilePath);
_ ->
file:delete(FilePath)
end;
{error, _Reason} ->
ok
end
end, Files).

View file

@ -272,16 +272,11 @@ empty_check({Backend, State}) ->
}. }.
setup({BackendMod, Config}) -> setup({BackendMod, Config}) ->
application:start(lager), %% Start the backend
application:start(sasl),
application:start(os_mon),
{ok, S} = BackendMod:start(42, Config), {ok, S} = BackendMod:start(42, Config),
{BackendMod, S}. {BackendMod, S}.
cleanup({BackendMod, S}) -> cleanup({BackendMod, S}) ->
ok = BackendMod:stop(S), ok = BackendMod:stop(S).
application:stop(lager),
application:stop(sasl),
application:stop(os_mon).
-endif. % TEST -endif. % TEST

View file

@ -1,6 +1,6 @@
{application, wterl, {application, wterl,
[ [
{description, "Erlang NIF Wrapper for WiredTiger"}, {description, "Erlang Wrapper for WiredTiger"},
{vsn, "0.9.0"}, {vsn, "0.9.0"},
{registered, []}, {registered, []},
{applications, [ {applications, [

File diff suppressed because it is too large Load diff

View file

@ -30,13 +30,15 @@
%% API %% API
-export([start_link/0, stop/0, -export([start_link/0, stop/0,
open/1, open/2, open/3, is_open/0, get/0, close/1]). open/1, open/2, is_open/0, get/0, close/1]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, { conn :: wterl:connection() }). -record(state, {
conn :: wterl:connection()
}).
-type config_list() :: [{atom(), any()}]. -type config_list() :: [{atom(), any()}].
@ -53,14 +55,12 @@ stop() ->
gen_server:cast(?MODULE, stop). gen_server:cast(?MODULE, stop).
-spec open(string()) -> {ok, wterl:connection()} | {error, term()}. -spec open(string()) -> {ok, wterl:connection()} | {error, term()}.
-spec open(string(), config_list()) -> {ok, wterl:connection()} | {error, term()}.
-spec open(string(), config_list(), config_list()) -> {ok, wterl:connection()} | {error, term()}.
open(Dir) -> open(Dir) ->
open(Dir, [], []). open(Dir, []).
open(Dir, ConnectionConfig) ->
gen_server:call(?MODULE, {open, Dir, ConnectionConfig, [], self()}, infinity). -spec open(string(), config_list()) -> {ok, wterl:connection()} | {error, term()}.
open(Dir, ConnectionConfig, SessionConfig) -> open(Dir, Config) ->
gen_server:call(?MODULE, {open, Dir, ConnectionConfig, SessionConfig, self()}, infinity). gen_server:call(?MODULE, {open, Dir, Config, self()}, infinity).
-spec is_open() -> boolean(). -spec is_open() -> boolean().
is_open() -> is_open() ->
@ -82,9 +82,11 @@ init([]) ->
true = wterl_ets:table_ready(), true = wterl_ets:table_ready(),
{ok, #state{}}. {ok, #state{}}.
handle_call({open, Dir, ConnectionConfig, SessionConfig, Caller}, _From, #state{conn=undefined}=State) -> handle_call({open, Dir, Config, Caller}, _From, #state{conn=undefined}=State) ->
{Reply, NState} = Opts = [{create, true},
case wterl:connection_open(Dir, ConnectionConfig, SessionConfig) of config_value(cache_size, Config, "100MB"),
config_value(session_max, Config, 100)],
{Reply, NState} = case wterl:conn_open(Dir, wterl:config_to_bin(Opts)) of
{ok, ConnRef}=OK -> {ok, ConnRef}=OK ->
Monitor = erlang:monitor(process, Caller), Monitor = erlang:monitor(process, Caller),
true = ets:insert(wterl_ets, {Monitor, Caller}), true = ets:insert(wterl_ets, {Monitor, Caller}),
@ -93,7 +95,7 @@ handle_call({open, Dir, ConnectionConfig, SessionConfig, Caller}, _From, #state{
{Error, State} {Error, State}
end, end,
{reply, Reply, NState}; {reply, Reply, NState};
handle_call({open, _Dir, _ConnectionConfig, _SessionConfig, Caller}, _From, #state{conn=ConnRef}=State) -> handle_call({open, _Dir, _Config, Caller}, _From,#state{conn=ConnRef}=State) ->
Monitor = erlang:monitor(process, Caller), Monitor = erlang:monitor(process, Caller),
true = ets:insert(wterl_ets, {Monitor, Caller}), true = ets:insert(wterl_ets, {Monitor, Caller}),
{reply, {ok, ConnRef}, State}; {reply, {ok, ConnRef}, State};
@ -166,7 +168,11 @@ code_change(_OldVsn, State, _Extra) ->
do_close(undefined) -> do_close(undefined) ->
ok; ok;
do_close(ConnRef) -> do_close(ConnRef) ->
wterl:connection_close(ConnRef). wterl:conn_close(ConnRef).
%% @private
config_value(Key, Config, Default) ->
{Key, app_helper:get_prop_or_env(Key, Config, wterl, Default)}.
-ifdef(TEST). -ifdef(TEST).
@ -209,14 +215,14 @@ simple_test_() ->
end}]}. end}]}.
open_one() -> open_one() ->
{ok, Ref} = open("test/wterl-backend", [{create, true}, {session_max, 20},{cache_size, "1MB"}]), {ok, Ref} = open("test/wterl-backend", [{session_max, 20},{cache_size, "1MB"}]),
true = is_open(), true = is_open(),
close(Ref), close(Ref),
false = is_open(), false = is_open(),
ok. ok.
open_and_wait(Pid) -> open_and_wait(Pid) ->
{ok, Ref} = open("test/wterl-backend", [{create, true}]), {ok, Ref} = open("test/wterl-backend"),
Pid ! open, Pid ! open,
receive receive
close -> close ->

View file

@ -1,111 +0,0 @@
%% -------------------------------------------------------------------
%%
%% wterl: Erlang Wrapper for WiredTiger
%%
%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(wterl_event_handler).
-behaviour(gen_server).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
%% API
-export([start_link/0, stop/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(PREFIX, "wiredtiger").
%% ====================================================================
%% API
%% ====================================================================
-spec start_link() -> {ok, pid()} | {error, term()}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec stop() -> ok.
stop() ->
gen_server:cast(?MODULE, stop).
%% ====================================================================
%% gen_server callbacks
%% ====================================================================
init([]) ->
wterl:set_event_handler_pid(self()),
{ok, []}.
handle_call(_Msg, _From, State) ->
{reply, ok, State}.
handle_cast(stop, State) ->
{stop, normal, State};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({error, {Errno, Message}}, State) ->
log(error, "~s: (~s) ~s", [?PREFIX, Errno, Message]),
{noreply, State};
handle_info({message, Info}, State) ->
log(info, "~s: ~s", [?PREFIX, Info]),
{noreply, State};
handle_info({progress, {Operation, Counter}}, State) ->
log(info, "~s: progress on ~s [~b]", [?PREFIX, Operation, Counter]),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ====================================================================
%% Internal functions
%% ====================================================================
%% @private
-spec log(error | info, string(), [any()]) -> ok.
log(Urgency, Format, Args) ->
case proplists:is_defined(lager, application:which_applications()) of
true ->
log(lager, Urgency, Format, Args);
false ->
log(stdio, Urgency, Format, Args)
end.
-spec log(lager | stdio, error | info, string(), [any()]) -> ok.
log(lager, error, Format, Args) ->
lager:error(Format, Args);
log(lager, info, Format, Args) ->
lager:info(Format, Args);
log(stdio, _, Format, Args) ->
io:format(Format ++ "~n", Args).
%% ===================================================================
%% EUnit tests
%% ===================================================================
-ifdef(TEST).
-endif.

View file

@ -46,5 +46,4 @@ start_link() ->
init([]) -> init([]) ->
{ok, {{one_for_one, 5, 10}, [?CHILD(wterl_ets, worker), {ok, {{one_for_one, 5, 10}, [?CHILD(wterl_ets, worker),
?CHILD(wterl_conn, worker), ?CHILD(wterl_conn, worker)]}}.
?CHILD(wterl_event_handler, worker)]}}.

View file

@ -1,91 +0,0 @@
-module(basho_bench_driver_wterl).
-record(state, { connection, uri }).
-export([new/1,
run/4]).
-include_lib("basho_bench/include/basho_bench.hrl").
%% ====================================================================
%% API
%% ====================================================================
new(1) ->
%% Make sure wterl is available
case code:which(wterl) of
non_existing ->
?FAIL_MSG("~s requires wterl to be available on code path.\n",
[?MODULE]);
_ ->
ok
end,
{ok, _} = wterl_sup:start_link(),
setup(1);
new(Id) ->
setup(Id).
setup(Id) ->
%% Get the target directory
Dir = basho_bench_config:get(wterl_dir, "/tmp"),
Config = basho_bench_config:get(wterl, []),
Uri = config_value(table_uri, Config, "lsm:test"),
ConnectionOpts = config_value(connection, Config, [{create,true},{session_max, 8192}]),
SessionOpts = config_value(session, Config, []),
TableOpts = config_value(table, Config, []),
%% Start WiredTiger
Connection =
case wterl_conn:is_open() of
false ->
case wterl_conn:open(Dir, ConnectionOpts, SessionOpts) of
{ok, Conn} ->
Conn;
{error, Reason0} ->
?FAIL_MSG("Failed to establish a WiredTiger connection for ~p, wterl backend unable to start: ~p\n", [Id, Reason0])
end;
true ->
{ok, Conn} = wterl_conn:get(),
Conn
end,
case wterl:create(Connection, Uri, TableOpts) of
ok ->
{ok, #state{connection=Connection, uri=Uri}};
{error, Reason} ->
{error, Reason}
end.
run(get, KeyGen, _ValueGen, #state{connection=Connection, uri=Uri}=State) ->
case wterl:get(Connection, Uri, KeyGen()) of
{ok, _Value} ->
{ok, State};
not_found ->
{ok, State};
{error, Reason} ->
{error, Reason}
end;
run(put, KeyGen, ValueGen, #state{connection=Connection, uri=Uri}=State) ->
case wterl:put(Connection, Uri, KeyGen(), ValueGen()) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason}
end;
run(delete, KeyGen, _ValueGen, #state{connection=Connection, uri=Uri}=State) ->
case wterl:delete(Connection, Uri, KeyGen()) of
ok ->
{ok, State};
not_found ->
{ok, State};
{error, Reason} ->
{error, Reason}
end.
config_value(Key, Config, Default) ->
case proplists:get_value(Key, Config) of
undefined ->
Default;
Value ->
Value
end.

View file

@ -1,102 +0,0 @@
%%-*- mode: erlang -*-
%% ex: ft=erlang ts=4 sw=4 et
%% How to:
%% * put the wterl-b_b.config file into basho_bench/examples
%% * put the basho_bench_driver_wterl.erl into basho_bench/src
%% * make clean in basho_bench, then make
%% * edit examples/wterl-b_b.config
%% - change {code_paths, ["../wterl"]}. to be a relative path to your
%% wterl directory
%% - change {wterl_dir, "/home/gburd/ws/basho_bench/data"}. to a fully
%% qualified location for your test data files (mkdir that directory
%% yourself, if it doesn't exist the test will fail 'enoent')
%% * to run, replace this path with the proper path on your system:
%% LD_LIBRARY_PATH=/home/you/wterl/priv ./basho_bench examples/wterl-b_b.config
%% * the test should run for 10 minutes (as it is configured right now)
%% with 4 concurrent workers accessing the same table
%%
%% Note:
%% There are two config sections in wt.config {wterl, [ ... ]}. and
%% {wterl_, [ ... ]}. The one being used is named "wterl" the other
%% config is ignored. I setup an LSM and BTREE config and to choose
%% which is run you just rename those two sections (turn one off by
%% adding a "_" to the name and take the "_" out of the other's name).
{mode, max}.
{duration, 10}.
{concurrent, 16}.
{report_interval, 1}.
{pb_timeout_general, 1000}. % ms
%{pb_timeout_read, ?}.
%{pb_timeout_write, ?}.
%{pb_timeout_listkeys, ?}.
%{pb_timeout_mapreduce, ?}.
{driver, basho_bench_driver_wterl}.
{key_generator, {int_to_bin_littleendian,{uniform_int, 5000000}}}.
{value_generator, {fixed_bin, 10000}}.
{operations, [{get, 4}, {put, 4}, {delete, 2}]}.
{code_paths, ["../wterl"]}.
{wterl_dir, "/home/gburd/ws/basho_bench/data"}.
%% lsm
{wterl, [
{connection, [
{create, true},
{session_sync, false},
{transaction_sync, "none"},
{log, [{enabled, false}]},
{session_max, 1024},
{cache_size, 4294967296},
{verbose, []},
% "salvage", "verify" are okay, however...
% for some unknown reason, if you add these additional
% verbose flags Erlang SEGV's "size_object: bad tag for 0x80"
% no idea why... yet... you've been warned.
%"block", "shared_cache", "reconcile", "evict", "lsm",
%"fileops", "read", "write", "readserver", "evictserver",
%"hazard", "mutex", "ckpt"
{statistics_log, [{wait, 30}]}
]},
{session, [ {isolation, "snapshot"} ]},
{table_uri, "lsm:test"},
{lsm_merge_threads, 2},
{table, [
{internal_page_max, "128K"},
{leaf_page_max, "128K"},
{lsm_chunk_size, "25MB"},
{lsm_bloom_newest, true},
{lsm_bloom_oldest, true} ,
{lsm_bloom_bit_count, 128},
{lsm_bloom_hash_count, 64},
{lsm_bloom_config, [{leaf_page_max, "8MB"}]},
{block_compressor, "snappy"} % bzip2
]}
]}.
%% btree
{wterl_, [
{connection, [
{create, true},
{session_sync, false},
{transaction_sync, "none"},
{log, [{enabled, false}]},
{session_max, 1024},
{cache_size, 4294967296},
{verbose, []},
% "salvage", "verify" are okay, however...
% for some unknown reason, if you add these additional
% verbose flags Erlang SEGV's "size_object: bad tag for 0x80"
% no idea why... yet... you've been warned.
%"block", "shared_cache", "reconcile", "evict", "lsm",
%"fileops", "read", "write", "readserver", "evictserver",
%"hazard", "mutex", "ckpt"
{statistics_log, [{wait, 30}]},
{checkpoint, [{await, 10}]}
]},
{session, [ {isolation, "snapshot"} ]},
{table_uri, "table:test"},
{table, [
{block_compressor, "snappy"} % bzip2
]}
]}.

View file

@ -1,10 +0,0 @@
#!/bin/sh -
# Note: also, remember to update version numbers in rpath specs so that shared libs can be found at runtime!!!
wterl=`git describe --always --long --tags`
wiredtiger0=`(cd c_src/wiredtiger-[0-9.]* && git describe --always --long --tags)`
wiredtiger=`echo $wiredtiger0 | awk '{print $2}'`
echo $wterl
echo $wiredtiger