From 9f4e08ca6edc9993de78cd0be9ca162284cc86ef Mon Sep 17 00:00:00 2001 From: Gregory Burd Date: Sun, 24 Mar 2013 21:00:48 -0400 Subject: [PATCH] Queue and execute work from scheduler threads on other threads to prevent schedulers from sleeping. --- c_src/async_nif.h | 282 ++++++ c_src/queue.h | 648 ++++++++++++++ c_src/wterl.c | 1587 ++++++++++++++++++++++----------- src/async_nif.hrl | 43 + src/riak_kv_wterl_backend.erl | 142 ++- src/temp_riak_kv_backend.erl | 7 +- src/wterl.erl | 187 +++- 7 files changed, 2265 insertions(+), 631 deletions(-) create mode 100644 c_src/async_nif.h create mode 100644 c_src/queue.h create mode 100644 src/async_nif.hrl diff --git a/c_src/async_nif.h b/c_src/async_nif.h new file mode 100644 index 0000000..8781e9d --- /dev/null +++ b/c_src/async_nif.h @@ -0,0 +1,282 @@ +/* + * + * async_nif: An async thread-pool layer for Erlang's NIF API + * + * Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. + * Author: Gregory Burd + * + * This file is provided to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef __ASYNC_NIF_H__ +#define __ASYNC_NIF_H__ + +#if defined(__cplusplus) +extern "C" { +#endif + +/* Redefine this in your NIF implementation before including this file to + change the thread pool size. The maximum number of threads might be + bounded on your OS. For instance, to allow 1,000,000 threads on a Linux + system you must do the following before launching the process. + echo 1000000 > /proc/sys/kernel/threads-max + and for all UNIX systems there will be ulimit maximums. */ +#ifndef ASYNC_NIF_MAX_WORKERS +#define ASYNC_NIF_MAX_WORKERS 16 +#endif + +#include "queue.h" + +struct async_nif_req_entry { + ERL_NIF_TERM ref, *argv; + ErlNifEnv *env; + ErlNifPid pid; + void *args; + void *priv_data; + void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, void *, ErlNifPid*, void *); + void (*fn_post)(void *); + STAILQ_ENTRY(async_nif_req_entry) entries; +}; +STAILQ_HEAD(reqs, async_nif_req_entry) async_nif_reqs = STAILQ_HEAD_INITIALIZER(async_nif_reqs); + +struct async_nif_worker_entry { + ErlNifTid tid; + LIST_ENTRY(async_nif_worker_entry) entries; +}; +LIST_HEAD(idle_workers, async_nif_worker_entry) async_nif_idle_workers = LIST_HEAD_INITIALIZER(async_nif_worker); + +static volatile unsigned int async_nif_req_count = 0; +static volatile unsigned int async_nif_shutdown = 0; +static ErlNifMutex *async_nif_req_mutex = NULL; +static ErlNifMutex *async_nif_worker_mutex = NULL; +static ErlNifCond *async_nif_cnd = NULL; +static struct async_nif_worker_entry async_nif_worker_entries[ASYNC_NIF_MAX_WORKERS]; + + +#define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \ + struct decl ## _args frame; \ + static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, void *priv_data, ErlNifPid *pid, struct decl ## _args *args) work_block \ + static void fn_post_ ## decl (struct decl ## _args *args) { \ + do post_block while(0); \ + } \ + static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \ + struct decl ## _args on_stack_args; \ + struct decl ## _args *args = &on_stack_args; \ + struct decl ## _args *copy_of_args; \ + struct async_nif_req_entry *req = NULL; \ + ErlNifEnv *new_env = NULL; \ + /* argv[0] is a ref used for selective recv */ \ + const ERL_NIF_TERM *argv = argv_in + 1; \ + argc--; \ + if (async_nif_shutdown) \ + return enif_make_tuple2(env, enif_make_atom(env, "error"), \ + enif_make_atom(env, "shutdown")); \ + if (!(new_env = enif_alloc_env())) { \ + return enif_make_tuple2(env, enif_make_atom(env, "error"), \ + enif_make_atom(env, "enomem")); \ + } \ + do pre_block while(0); \ + req = (struct async_nif_req_entry*)enif_alloc(sizeof(struct async_nif_req_entry)); \ + if (!req) { \ + fn_post_ ## decl (args); \ + enif_free_env(new_env); \ + return enif_make_tuple2(env, enif_make_atom(env, "error"), \ + enif_make_atom(env, "enomem")); \ + } \ + copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \ + if (!copy_of_args) { \ + fn_post_ ## decl (args); \ + enif_free_env(new_env); \ + return enif_make_tuple2(env, enif_make_atom(env, "error"), \ + enif_make_atom(env, "enomem")); \ + } \ + memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \ + req->env = new_env; \ + req->ref = enif_make_copy(new_env, argv_in[0]); \ + enif_self(env, &req->pid); \ + req->args = (void*)copy_of_args; \ + req->priv_data = enif_priv_data(env); \ + req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, void*, ErlNifPid*, void *))fn_work_ ## decl ; \ + req->fn_post = (void (*)(void *))fn_post_ ## decl; \ + async_nif_enqueue_req(req); \ + return enif_make_tuple2(env, enif_make_atom(env, "ok"), \ + enif_make_tuple2(env, enif_make_atom(env, "enqueued"), \ + enif_make_int(env, async_nif_req_count))); \ + } + +#define ASYNC_NIF_LOAD() if (async_nif_init() != 0) return -1; +#define ASYNC_NIF_UNLOAD() async_nif_unload(); +#define ASYNC_NIF_UPGRADE() async_nif_unload(); + +#define ASYNC_NIF_RETURN_BADARG() return enif_make_badarg(env); +#define ASYNC_NIF_WORK_ENV new_env + +#ifndef PULSE_FORCE_USING_PULSE_SEND_HERE +#define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg)) +#else +#define ASYNC_NIF_REPLY(msg) PULSE_SEND(NULL, pid, env, enif_make_tuple2(env, ref, msg)) +#endif + +static void async_nif_enqueue_req(struct async_nif_req_entry *r) +{ + /* Add the request to the work queue. */ + enif_mutex_lock(async_nif_req_mutex); + STAILQ_INSERT_TAIL(&async_nif_reqs, r, entries); + async_nif_req_count++; + enif_mutex_unlock(async_nif_req_mutex); + enif_cond_broadcast(async_nif_cnd); +} + +static void *async_nif_worker_fn(void *arg) +{ + struct async_nif_worker_entry *worker = (struct async_nif_worker_entry *)arg; + struct async_nif_req_entry *req = NULL; + + /* + * Workers are active while there is work on the queue to do and + * only in the idle list when they are waiting on new work. + */ + for(;;) { + /* Examine the request queue, are there things to be done? */ + enif_mutex_lock(async_nif_req_mutex); + enif_mutex_lock(async_nif_worker_mutex); + LIST_INSERT_HEAD(&async_nif_idle_workers, worker, entries); + enif_mutex_unlock(async_nif_worker_mutex); + check_again_for_work: + if (async_nif_shutdown) { enif_mutex_unlock(async_nif_req_mutex); break; } + if ((req = STAILQ_FIRST(&async_nif_reqs)) == NULL) { + /* Queue is empty, join the list of idle workers and wait for work */ + enif_cond_wait(async_nif_cnd, async_nif_req_mutex); + goto check_again_for_work; + } else { + /* `req` is our work request and we hold the lock. */ + enif_cond_broadcast(async_nif_cnd); + + /* Take the request off the queue. */ + STAILQ_REMOVE(&async_nif_reqs, req, async_nif_req_entry, entries); async_nif_req_count--; + + /* Now we need to remove this thread from the list of idle threads. */ + enif_mutex_lock(async_nif_worker_mutex); + LIST_REMOVE(worker, entries); + + /* Release the locks in reverse order that we acquired them, + so as not to self-deadlock. */ + enif_mutex_unlock(async_nif_worker_mutex); + enif_mutex_unlock(async_nif_req_mutex); + + /* Finally, let's do the work! :) */ + req->fn_work(req->env, req->ref, req->priv_data, &req->pid, req->args); + req->fn_post(req->args); + enif_free(req->args); + enif_free_env(req->env); + enif_free(req); + } + } + enif_thread_exit(0); + return 0; +} + +static void async_nif_unload(void) +{ + unsigned int i; + + /* Signal the worker threads, stop what you're doing and exit. */ + enif_mutex_lock(async_nif_req_mutex); + async_nif_shutdown = 1; + enif_cond_broadcast(async_nif_cnd); + enif_mutex_unlock(async_nif_req_mutex); + + /* Join for the now exiting worker threads. */ + for (i = 0; i < ASYNC_NIF_MAX_WORKERS; ++i) { + void *exit_value = 0; /* Ignore this. */ + enif_thread_join(async_nif_worker_entries[i].tid, &exit_value); + } + + /* We won't get here until all threads have exited. + Patch things up, and carry on. */ + enif_mutex_lock(async_nif_req_mutex); + + /* Worker threads are stopped, now toss anything left in the queue. */ + struct async_nif_req_entry *req = NULL; + STAILQ_FOREACH(req, &async_nif_reqs, entries) { + STAILQ_REMOVE(&async_nif_reqs, STAILQ_LAST(&async_nif_reqs, async_nif_req_entry, entries), + async_nif_req_entry, entries); +#ifdef PULSE + PULSE_SEND(NULL, &req->pid, req->env, + enif_make_tuple2(req->env, enif_make_atom(req->env, "error"), + enif_make_atom(req->env, "shutdown"))); +#else + enif_send(NULL, &req->pid, req->env, + enif_make_tuple2(req->env, enif_make_atom(req->env, "error"), + enif_make_atom(req->env, "shutdown"))); +#endif + req->fn_post(req->args); + enif_free(req->args); + enif_free(req); + async_nif_req_count--; + } + enif_mutex_unlock(async_nif_req_mutex); + + memset(async_nif_worker_entries, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS, 0); + enif_cond_destroy(async_nif_cnd); async_nif_cnd = NULL; + enif_mutex_destroy(async_nif_req_mutex); async_nif_req_mutex = NULL; + enif_mutex_destroy(async_nif_worker_mutex); async_nif_worker_mutex = NULL; +} + +static int async_nif_init(void) +{ + int i; + + /* Don't init more than once. */ + if (async_nif_req_mutex) return 0; + + async_nif_req_mutex = enif_mutex_create(NULL); + async_nif_worker_mutex = enif_mutex_create(NULL); + async_nif_cnd = enif_cond_create(NULL); + + /* Setup the requests management. */ + async_nif_req_count = 0; + + /* Setup the thread pool management. */ + enif_mutex_lock(async_nif_worker_mutex); + memset(async_nif_worker_entries, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS, 0); + + for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) { + if (enif_thread_create(NULL, &async_nif_worker_entries[i].tid, + &async_nif_worker_fn, (void*)&async_nif_worker_entries[i], NULL) != 0) { + async_nif_shutdown = 1; + enif_cond_broadcast(async_nif_cnd); + enif_mutex_unlock(async_nif_worker_mutex); + while(i-- > 0) { + void *exit_value = 0; /* Ignore this. */ + enif_thread_join(async_nif_worker_entries[i].tid, &exit_value); + } + memset(async_nif_worker_entries, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS, 0); + enif_cond_destroy(async_nif_cnd); async_nif_cnd = NULL; + enif_mutex_destroy(async_nif_req_mutex); async_nif_req_mutex = NULL; + enif_mutex_destroy(async_nif_worker_mutex); async_nif_worker_mutex = NULL; + return -1; + } + } + enif_mutex_unlock(async_nif_worker_mutex); + return 0; +} + +#if defined(__cplusplus) +} +#endif + +#endif // __ASYNC_NIF_H__ diff --git a/c_src/queue.h b/c_src/queue.h new file mode 100644 index 0000000..383f8c9 --- /dev/null +++ b/c_src/queue.h @@ -0,0 +1,648 @@ +/*- + * Copyright (c) 1991, 1993 + * The Regents of the University of California. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 4. Neither the name of the University nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * + * @(#)queue.h 8.5 (Berkeley) 8/20/94 + * $FreeBSD: src/sys/sys/queue.h,v 1.75.2.3 2012/11/17 11:37:26 svnexp Exp $ + */ + +#ifndef _SYS_QUEUE_H_ +#define _SYS_QUEUE_H_ + +#ifndef __offsetof +#define __offsetof(st, m) \ + ((size_t) ( (char *)&((st *)0)->m - (char *)0 )) +#endif + +#ifndef __containerof +#define __containerof(ptr, type, member) ({ \ + const typeof( ((type *)0)->member ) *__mptr = (ptr); \ + (type *)( (char *)__mptr - __offsetof(type,member) );}) +#endif + +/* + * This file defines four types of data structures: singly-linked lists, + * singly-linked tail queues, lists and tail queues. + * + * A singly-linked list is headed by a single forward pointer. The elements + * are singly linked for minimum space and pointer manipulation overhead at + * the expense of O(n) removal for arbitrary elements. New elements can be + * added to the list after an existing element or at the head of the list. + * Elements being removed from the head of the list should use the explicit + * macro for this purpose for optimum efficiency. A singly-linked list may + * only be traversed in the forward direction. Singly-linked lists are ideal + * for applications with large datasets and few or no removals or for + * implementing a LIFO queue. + * + * A singly-linked tail queue is headed by a pair of pointers, one to the + * head of the list and the other to the tail of the list. The elements are + * singly linked for minimum space and pointer manipulation overhead at the + * expense of O(n) removal for arbitrary elements. New elements can be added + * to the list after an existing element, at the head of the list, or at the + * end of the list. Elements being removed from the head of the tail queue + * should use the explicit macro for this purpose for optimum efficiency. + * A singly-linked tail queue may only be traversed in the forward direction. + * Singly-linked tail queues are ideal for applications with large datasets + * and few or no removals or for implementing a FIFO queue. + * + * A list is headed by a single forward pointer (or an array of forward + * pointers for a hash table header). The elements are doubly linked + * so that an arbitrary element can be removed without a need to + * traverse the list. New elements can be added to the list before + * or after an existing element or at the head of the list. A list + * may be traversed in either direction. + * + * A tail queue is headed by a pair of pointers, one to the head of the + * list and the other to the tail of the list. The elements are doubly + * linked so that an arbitrary element can be removed without a need to + * traverse the list. New elements can be added to the list before or + * after an existing element, at the head of the list, or at the end of + * the list. A tail queue may be traversed in either direction. + * + * For details on the use of these macros, see the queue(3) manual page. + * + * + * SLIST LIST STAILQ TAILQ + * _HEAD + + + + + * _HEAD_INITIALIZER + + + + + * _ENTRY + + + + + * _INIT + + + + + * _EMPTY + + + + + * _FIRST + + + + + * _NEXT + + + + + * _PREV - + - + + * _LAST - - + + + * _FOREACH + + + + + * _FOREACH_SAFE + + + + + * _FOREACH_REVERSE - - - + + * _FOREACH_REVERSE_SAFE - - - + + * _INSERT_HEAD + + + + + * _INSERT_BEFORE - + - + + * _INSERT_AFTER + + + + + * _INSERT_TAIL - - + + + * _CONCAT - - + + + * _REMOVE_AFTER + - + - + * _REMOVE_HEAD + - + - + * _REMOVE + + + + + * _SWAP + + + + + * + */ +#ifdef QUEUE_MACRO_DEBUG +/* Store the last 2 places the queue element or head was altered */ +struct qm_trace { + char * lastfile; + int lastline; + char * prevfile; + int prevline; +}; + +#define TRACEBUF struct qm_trace trace; +#define TRASHIT(x) do {(x) = (void *)-1;} while (0) +#define QMD_SAVELINK(name, link) void **name = (void *)&(link) + +#define QMD_TRACE_HEAD(head) do { \ + (head)->trace.prevline = (head)->trace.lastline; \ + (head)->trace.prevfile = (head)->trace.lastfile; \ + (head)->trace.lastline = __LINE__; \ + (head)->trace.lastfile = __FILE__; \ +} while (0) + +#define QMD_TRACE_ELEM(elem) do { \ + (elem)->trace.prevline = (elem)->trace.lastline; \ + (elem)->trace.prevfile = (elem)->trace.lastfile; \ + (elem)->trace.lastline = __LINE__; \ + (elem)->trace.lastfile = __FILE__; \ +} while (0) + +#else +#define QMD_TRACE_ELEM(elem) +#define QMD_TRACE_HEAD(head) +#define QMD_SAVELINK(name, link) +#define TRACEBUF +#define TRASHIT(x) +#endif /* QUEUE_MACRO_DEBUG */ + +/* + * Singly-linked List declarations. + */ +#define SLIST_HEAD(name, type) \ +struct name { \ + struct type *slh_first; /* first element */ \ +} + +#define SLIST_HEAD_INITIALIZER(head) \ + { NULL } + +#define SLIST_ENTRY(type) \ +struct { \ + struct type *sle_next; /* next element */ \ +} + +/* + * Singly-linked List functions. + */ +#define SLIST_EMPTY(head) ((head)->slh_first == NULL) + +#define SLIST_FIRST(head) ((head)->slh_first) + +#define SLIST_FOREACH(var, head, field) \ + for ((var) = SLIST_FIRST((head)); \ + (var); \ + (var) = SLIST_NEXT((var), field)) + +#define SLIST_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = SLIST_FIRST((head)); \ + (var) && ((tvar) = SLIST_NEXT((var), field), 1); \ + (var) = (tvar)) + +#define SLIST_FOREACH_PREVPTR(var, varp, head, field) \ + for ((varp) = &SLIST_FIRST((head)); \ + ((var) = *(varp)) != NULL; \ + (varp) = &SLIST_NEXT((var), field)) + +#define SLIST_INIT(head) do { \ + SLIST_FIRST((head)) = NULL; \ +} while (0) + +#define SLIST_INSERT_AFTER(slistelm, elm, field) do { \ + SLIST_NEXT((elm), field) = SLIST_NEXT((slistelm), field); \ + SLIST_NEXT((slistelm), field) = (elm); \ +} while (0) + +#define SLIST_INSERT_HEAD(head, elm, field) do { \ + SLIST_NEXT((elm), field) = SLIST_FIRST((head)); \ + SLIST_FIRST((head)) = (elm); \ +} while (0) + +#define SLIST_NEXT(elm, field) ((elm)->field.sle_next) + +#define SLIST_REMOVE(head, elm, type, field) do { \ + QMD_SAVELINK(oldnext, (elm)->field.sle_next); \ + if (SLIST_FIRST((head)) == (elm)) { \ + SLIST_REMOVE_HEAD((head), field); \ + } \ + else { \ + struct type *curelm = SLIST_FIRST((head)); \ + while (SLIST_NEXT(curelm, field) != (elm)) \ + curelm = SLIST_NEXT(curelm, field); \ + SLIST_REMOVE_AFTER(curelm, field); \ + } \ + TRASHIT(*oldnext); \ +} while (0) + +#define SLIST_REMOVE_AFTER(elm, field) do { \ + SLIST_NEXT(elm, field) = \ + SLIST_NEXT(SLIST_NEXT(elm, field), field); \ +} while (0) + +#define SLIST_REMOVE_HEAD(head, field) do { \ + SLIST_FIRST((head)) = SLIST_NEXT(SLIST_FIRST((head)), field); \ +} while (0) + +#define SLIST_SWAP(head1, head2, type) do { \ + struct type *swap_first = SLIST_FIRST(head1); \ + SLIST_FIRST(head1) = SLIST_FIRST(head2); \ + SLIST_FIRST(head2) = swap_first; \ +} while (0) + +/* + * Singly-linked Tail queue declarations. + */ +#define STAILQ_HEAD(name, type) \ +struct name { \ + struct type *stqh_first;/* first element */ \ + struct type **stqh_last;/* addr of last next element */ \ +} + +#define STAILQ_HEAD_INITIALIZER(head) \ + { NULL, &(head).stqh_first } + +#define STAILQ_ENTRY(type) \ +struct { \ + struct type *stqe_next; /* next element */ \ +} + +/* + * Singly-linked Tail queue functions. + */ +#define STAILQ_CONCAT(head1, head2) do { \ + if (!STAILQ_EMPTY((head2))) { \ + *(head1)->stqh_last = (head2)->stqh_first; \ + (head1)->stqh_last = (head2)->stqh_last; \ + STAILQ_INIT((head2)); \ + } \ +} while (0) + +#define STAILQ_EMPTY(head) ((head)->stqh_first == NULL) + +#define STAILQ_FIRST(head) ((head)->stqh_first) + +#define STAILQ_FOREACH(var, head, field) \ + for((var) = STAILQ_FIRST((head)); \ + (var); \ + (var) = STAILQ_NEXT((var), field)) + + +#define STAILQ_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = STAILQ_FIRST((head)); \ + (var) && ((tvar) = STAILQ_NEXT((var), field), 1); \ + (var) = (tvar)) + +#define STAILQ_INIT(head) do { \ + STAILQ_FIRST((head)) = NULL; \ + (head)->stqh_last = &STAILQ_FIRST((head)); \ +} while (0) + +#define STAILQ_INSERT_AFTER(head, tqelm, elm, field) do { \ + if ((STAILQ_NEXT((elm), field) = STAILQ_NEXT((tqelm), field)) == NULL)\ + (head)->stqh_last = &STAILQ_NEXT((elm), field); \ + STAILQ_NEXT((tqelm), field) = (elm); \ +} while (0) + +#define STAILQ_INSERT_HEAD(head, elm, field) do { \ + if ((STAILQ_NEXT((elm), field) = STAILQ_FIRST((head))) == NULL) \ + (head)->stqh_last = &STAILQ_NEXT((elm), field); \ + STAILQ_FIRST((head)) = (elm); \ +} while (0) + +#define STAILQ_INSERT_TAIL(head, elm, field) do { \ + STAILQ_NEXT((elm), field) = NULL; \ + *(head)->stqh_last = (elm); \ + (head)->stqh_last = &STAILQ_NEXT((elm), field); \ +} while (0) + +#define STAILQ_LAST(head, type, field) \ + (STAILQ_EMPTY((head)) ? NULL : \ + __containerof((head)->stqh_last, struct type, field.stqe_next)) + +#define STAILQ_NEXT(elm, field) ((elm)->field.stqe_next) + +#define STAILQ_REMOVE(head, elm, type, field) do { \ + QMD_SAVELINK(oldnext, (elm)->field.stqe_next); \ + if (STAILQ_FIRST((head)) == (elm)) { \ + STAILQ_REMOVE_HEAD((head), field); \ + } \ + else { \ + struct type *curelm = STAILQ_FIRST((head)); \ + while (STAILQ_NEXT(curelm, field) != (elm)) \ + curelm = STAILQ_NEXT(curelm, field); \ + STAILQ_REMOVE_AFTER(head, curelm, field); \ + } \ + TRASHIT(*oldnext); \ +} while (0) + +#define STAILQ_REMOVE_AFTER(head, elm, field) do { \ + if ((STAILQ_NEXT(elm, field) = \ + STAILQ_NEXT(STAILQ_NEXT(elm, field), field)) == NULL) \ + (head)->stqh_last = &STAILQ_NEXT((elm), field); \ +} while (0) + +#define STAILQ_REMOVE_HEAD(head, field) do { \ + if ((STAILQ_FIRST((head)) = \ + STAILQ_NEXT(STAILQ_FIRST((head)), field)) == NULL) \ + (head)->stqh_last = &STAILQ_FIRST((head)); \ +} while (0) + +#define STAILQ_SWAP(head1, head2, type) do { \ + struct type *swap_first = STAILQ_FIRST(head1); \ + struct type **swap_last = (head1)->stqh_last; \ + STAILQ_FIRST(head1) = STAILQ_FIRST(head2); \ + (head1)->stqh_last = (head2)->stqh_last; \ + STAILQ_FIRST(head2) = swap_first; \ + (head2)->stqh_last = swap_last; \ + if (STAILQ_EMPTY(head1)) \ + (head1)->stqh_last = &STAILQ_FIRST(head1); \ + if (STAILQ_EMPTY(head2)) \ + (head2)->stqh_last = &STAILQ_FIRST(head2); \ +} while (0) + + +/* + * List declarations. + */ +#define LIST_HEAD(name, type) \ +struct name { \ + struct type *lh_first; /* first element */ \ +} + +#define LIST_HEAD_INITIALIZER(head) \ + { NULL } + +#define LIST_ENTRY(type) \ +struct { \ + struct type *le_next; /* next element */ \ + struct type **le_prev; /* address of previous next element */ \ +} + +/* + * List functions. + */ + +#if (defined(_KERNEL) && defined(INVARIANTS)) +#define QMD_LIST_CHECK_HEAD(head, field) do { \ + if (LIST_FIRST((head)) != NULL && \ + LIST_FIRST((head))->field.le_prev != \ + &LIST_FIRST((head))) \ + panic("Bad list head %p first->prev != head", (head)); \ +} while (0) + +#define QMD_LIST_CHECK_NEXT(elm, field) do { \ + if (LIST_NEXT((elm), field) != NULL && \ + LIST_NEXT((elm), field)->field.le_prev != \ + &((elm)->field.le_next)) \ + panic("Bad link elm %p next->prev != elm", (elm)); \ +} while (0) + +#define QMD_LIST_CHECK_PREV(elm, field) do { \ + if (*(elm)->field.le_prev != (elm)) \ + panic("Bad link elm %p prev->next != elm", (elm)); \ +} while (0) +#else +#define QMD_LIST_CHECK_HEAD(head, field) +#define QMD_LIST_CHECK_NEXT(elm, field) +#define QMD_LIST_CHECK_PREV(elm, field) +#endif /* (_KERNEL && INVARIANTS) */ + +#define LIST_EMPTY(head) ((head)->lh_first == NULL) + +#define LIST_FIRST(head) ((head)->lh_first) + +#define LIST_FOREACH(var, head, field) \ + for ((var) = LIST_FIRST((head)); \ + (var); \ + (var) = LIST_NEXT((var), field)) + +#define LIST_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = LIST_FIRST((head)); \ + (var) && ((tvar) = LIST_NEXT((var), field), 1); \ + (var) = (tvar)) + +#define LIST_INIT(head) do { \ + LIST_FIRST((head)) = NULL; \ +} while (0) + +#define LIST_INSERT_AFTER(listelm, elm, field) do { \ + QMD_LIST_CHECK_NEXT(listelm, field); \ + if ((LIST_NEXT((elm), field) = LIST_NEXT((listelm), field)) != NULL)\ + LIST_NEXT((listelm), field)->field.le_prev = \ + &LIST_NEXT((elm), field); \ + LIST_NEXT((listelm), field) = (elm); \ + (elm)->field.le_prev = &LIST_NEXT((listelm), field); \ +} while (0) + +#define LIST_INSERT_BEFORE(listelm, elm, field) do { \ + QMD_LIST_CHECK_PREV(listelm, field); \ + (elm)->field.le_prev = (listelm)->field.le_prev; \ + LIST_NEXT((elm), field) = (listelm); \ + *(listelm)->field.le_prev = (elm); \ + (listelm)->field.le_prev = &LIST_NEXT((elm), field); \ +} while (0) + +#define LIST_INSERT_HEAD(head, elm, field) do { \ + QMD_LIST_CHECK_HEAD((head), field); \ + if ((LIST_NEXT((elm), field) = LIST_FIRST((head))) != NULL) \ + LIST_FIRST((head))->field.le_prev = &LIST_NEXT((elm), field);\ + LIST_FIRST((head)) = (elm); \ + (elm)->field.le_prev = &LIST_FIRST((head)); \ +} while (0) + +#define LIST_NEXT(elm, field) ((elm)->field.le_next) + +#define LIST_PREV(elm, head, type, field) \ + ((elm)->field.le_prev == &LIST_FIRST((head)) ? NULL : \ + __containerof((elm)->field.le_prev, struct type, field.le_next)) + +#define LIST_REMOVE(elm, field) do { \ + QMD_SAVELINK(oldnext, (elm)->field.le_next); \ + QMD_SAVELINK(oldprev, (elm)->field.le_prev); \ + QMD_LIST_CHECK_NEXT(elm, field); \ + QMD_LIST_CHECK_PREV(elm, field); \ + if (LIST_NEXT((elm), field) != NULL) \ + LIST_NEXT((elm), field)->field.le_prev = \ + (elm)->field.le_prev; \ + *(elm)->field.le_prev = LIST_NEXT((elm), field); \ + TRASHIT(*oldnext); \ + TRASHIT(*oldprev); \ +} while (0) + +#define LIST_SWAP(head1, head2, type, field) do { \ + struct type *swap_tmp = LIST_FIRST((head1)); \ + LIST_FIRST((head1)) = LIST_FIRST((head2)); \ + LIST_FIRST((head2)) = swap_tmp; \ + if ((swap_tmp = LIST_FIRST((head1))) != NULL) \ + swap_tmp->field.le_prev = &LIST_FIRST((head1)); \ + if ((swap_tmp = LIST_FIRST((head2))) != NULL) \ + swap_tmp->field.le_prev = &LIST_FIRST((head2)); \ +} while (0) + +/* + * Tail queue declarations. + */ +#define TAILQ_HEAD(name, type) \ +struct name { \ + struct type *tqh_first; /* first element */ \ + struct type **tqh_last; /* addr of last next element */ \ + TRACEBUF \ +} + +#define TAILQ_HEAD_INITIALIZER(head) \ + { NULL, &(head).tqh_first } + +#define TAILQ_ENTRY(type) \ +struct { \ + struct type *tqe_next; /* next element */ \ + struct type **tqe_prev; /* address of previous next element */ \ + TRACEBUF \ +} + +/* + * Tail queue functions. + */ +#if (defined(_KERNEL) && defined(INVARIANTS)) +#define QMD_TAILQ_CHECK_HEAD(head, field) do { \ + if (!TAILQ_EMPTY(head) && \ + TAILQ_FIRST((head))->field.tqe_prev != \ + &TAILQ_FIRST((head))) \ + panic("Bad tailq head %p first->prev != head", (head)); \ +} while (0) + +#define QMD_TAILQ_CHECK_TAIL(head, field) do { \ + if (*(head)->tqh_last != NULL) \ + panic("Bad tailq NEXT(%p->tqh_last) != NULL", (head)); \ +} while (0) + +#define QMD_TAILQ_CHECK_NEXT(elm, field) do { \ + if (TAILQ_NEXT((elm), field) != NULL && \ + TAILQ_NEXT((elm), field)->field.tqe_prev != \ + &((elm)->field.tqe_next)) \ + panic("Bad link elm %p next->prev != elm", (elm)); \ +} while (0) + +#define QMD_TAILQ_CHECK_PREV(elm, field) do { \ + if (*(elm)->field.tqe_prev != (elm)) \ + panic("Bad link elm %p prev->next != elm", (elm)); \ +} while (0) +#else +#define QMD_TAILQ_CHECK_HEAD(head, field) +#define QMD_TAILQ_CHECK_TAIL(head, headname) +#define QMD_TAILQ_CHECK_NEXT(elm, field) +#define QMD_TAILQ_CHECK_PREV(elm, field) +#endif /* (_KERNEL && INVARIANTS) */ + +#define TAILQ_CONCAT(head1, head2, field) do { \ + if (!TAILQ_EMPTY(head2)) { \ + *(head1)->tqh_last = (head2)->tqh_first; \ + (head2)->tqh_first->field.tqe_prev = (head1)->tqh_last; \ + (head1)->tqh_last = (head2)->tqh_last; \ + TAILQ_INIT((head2)); \ + QMD_TRACE_HEAD(head1); \ + QMD_TRACE_HEAD(head2); \ + } \ +} while (0) + +#define TAILQ_EMPTY(head) ((head)->tqh_first == NULL) + +#define TAILQ_FIRST(head) ((head)->tqh_first) + +#define TAILQ_FOREACH(var, head, field) \ + for ((var) = TAILQ_FIRST((head)); \ + (var); \ + (var) = TAILQ_NEXT((var), field)) + +#define TAILQ_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = TAILQ_FIRST((head)); \ + (var) && ((tvar) = TAILQ_NEXT((var), field), 1); \ + (var) = (tvar)) + +#define TAILQ_FOREACH_REVERSE(var, head, headname, field) \ + for ((var) = TAILQ_LAST((head), headname); \ + (var); \ + (var) = TAILQ_PREV((var), headname, field)) + +#define TAILQ_FOREACH_REVERSE_SAFE(var, head, headname, field, tvar) \ + for ((var) = TAILQ_LAST((head), headname); \ + (var) && ((tvar) = TAILQ_PREV((var), headname, field), 1); \ + (var) = (tvar)) + +#define TAILQ_INIT(head) do { \ + TAILQ_FIRST((head)) = NULL; \ + (head)->tqh_last = &TAILQ_FIRST((head)); \ + QMD_TRACE_HEAD(head); \ +} while (0) + +#define TAILQ_INSERT_AFTER(head, listelm, elm, field) do { \ + QMD_TAILQ_CHECK_NEXT(listelm, field); \ + if ((TAILQ_NEXT((elm), field) = TAILQ_NEXT((listelm), field)) != NULL)\ + TAILQ_NEXT((elm), field)->field.tqe_prev = \ + &TAILQ_NEXT((elm), field); \ + else { \ + (head)->tqh_last = &TAILQ_NEXT((elm), field); \ + QMD_TRACE_HEAD(head); \ + } \ + TAILQ_NEXT((listelm), field) = (elm); \ + (elm)->field.tqe_prev = &TAILQ_NEXT((listelm), field); \ + QMD_TRACE_ELEM(&(elm)->field); \ + QMD_TRACE_ELEM(&listelm->field); \ +} while (0) + +#define TAILQ_INSERT_BEFORE(listelm, elm, field) do { \ + QMD_TAILQ_CHECK_PREV(listelm, field); \ + (elm)->field.tqe_prev = (listelm)->field.tqe_prev; \ + TAILQ_NEXT((elm), field) = (listelm); \ + *(listelm)->field.tqe_prev = (elm); \ + (listelm)->field.tqe_prev = &TAILQ_NEXT((elm), field); \ + QMD_TRACE_ELEM(&(elm)->field); \ + QMD_TRACE_ELEM(&listelm->field); \ +} while (0) + +#define TAILQ_INSERT_HEAD(head, elm, field) do { \ + QMD_TAILQ_CHECK_HEAD(head, field); \ + if ((TAILQ_NEXT((elm), field) = TAILQ_FIRST((head))) != NULL) \ + TAILQ_FIRST((head))->field.tqe_prev = \ + &TAILQ_NEXT((elm), field); \ + else \ + (head)->tqh_last = &TAILQ_NEXT((elm), field); \ + TAILQ_FIRST((head)) = (elm); \ + (elm)->field.tqe_prev = &TAILQ_FIRST((head)); \ + QMD_TRACE_HEAD(head); \ + QMD_TRACE_ELEM(&(elm)->field); \ +} while (0) + +#define TAILQ_INSERT_TAIL(head, elm, field) do { \ + QMD_TAILQ_CHECK_TAIL(head, field); \ + TAILQ_NEXT((elm), field) = NULL; \ + (elm)->field.tqe_prev = (head)->tqh_last; \ + *(head)->tqh_last = (elm); \ + (head)->tqh_last = &TAILQ_NEXT((elm), field); \ + QMD_TRACE_HEAD(head); \ + QMD_TRACE_ELEM(&(elm)->field); \ +} while (0) + +#define TAILQ_LAST(head, headname) \ + (*(((struct headname *)((head)->tqh_last))->tqh_last)) + +#define TAILQ_NEXT(elm, field) ((elm)->field.tqe_next) + +#define TAILQ_PREV(elm, headname, field) \ + (*(((struct headname *)((elm)->field.tqe_prev))->tqh_last)) + +#define TAILQ_REMOVE(head, elm, field) do { \ + QMD_SAVELINK(oldnext, (elm)->field.tqe_next); \ + QMD_SAVELINK(oldprev, (elm)->field.tqe_prev); \ + QMD_TAILQ_CHECK_NEXT(elm, field); \ + QMD_TAILQ_CHECK_PREV(elm, field); \ + if ((TAILQ_NEXT((elm), field)) != NULL) \ + TAILQ_NEXT((elm), field)->field.tqe_prev = \ + (elm)->field.tqe_prev; \ + else { \ + (head)->tqh_last = (elm)->field.tqe_prev; \ + QMD_TRACE_HEAD(head); \ + } \ + *(elm)->field.tqe_prev = TAILQ_NEXT((elm), field); \ + TRASHIT(*oldnext); \ + TRASHIT(*oldprev); \ + QMD_TRACE_ELEM(&(elm)->field); \ +} while (0) + +#define TAILQ_SWAP(head1, head2, type, field) do { \ + struct type *swap_first = (head1)->tqh_first; \ + struct type **swap_last = (head1)->tqh_last; \ + (head1)->tqh_first = (head2)->tqh_first; \ + (head1)->tqh_last = (head2)->tqh_last; \ + (head2)->tqh_first = swap_first; \ + (head2)->tqh_last = swap_last; \ + if ((swap_first = (head1)->tqh_first) != NULL) \ + swap_first->field.tqe_prev = &(head1)->tqh_first; \ + else \ + (head1)->tqh_last = &(head1)->tqh_first; \ + if ((swap_first = (head2)->tqh_first) != NULL) \ + swap_first->field.tqe_prev = &(head2)->tqh_first; \ + else \ + (head2)->tqh_last = &(head2)->tqh_first; \ +} while (0) + +#endif /* !_SYS_QUEUE_H_ */ diff --git a/c_src/wterl.c b/c_src/wterl.c index 5e75998..9636465 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -26,6 +26,7 @@ #include #include "wiredtiger.h" +#include "async_nif.h" static ErlNifResourceType* wterl_conn_RESOURCE; static ErlNifResourceType* wterl_session_RESOURCE; @@ -43,83 +44,12 @@ typedef struct { WT_CURSOR* cursor; } WterlCursorHandle; -typedef char Uri[128]; // object names +typedef char Uri[128]; // object names // Atoms (initialized in on_load) static ERL_NIF_TERM ATOM_ERROR; static ERL_NIF_TERM ATOM_OK; -typedef ERL_NIF_TERM (*CursorRetFun)(ErlNifEnv* env, WT_CURSOR* cursor, int rc); - -// Prototypes -static ERL_NIF_TERM wterl_conn_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_conn_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_cursor_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_cursor_insert(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_cursor_key_ret(ErlNifEnv* env, WT_CURSOR *cursor, int rc); -static ERL_NIF_TERM wterl_cursor_kv_ret(ErlNifEnv* env, WT_CURSOR *cursor, int rc); -static ERL_NIF_TERM wterl_cursor_next(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_cursor_next_key(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_cursor_next_value(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_cursor_np_worker(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[], - CursorRetFun cursor_ret_fun, int next); -static ERL_NIF_TERM wterl_cursor_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_cursor_prev(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_cursor_prev_key(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_cursor_prev_value(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_cursor_remove(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_cursor_reset(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_cursor_search(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_cursor_search_near(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_cursor_search_worker(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[], int near); -static ERL_NIF_TERM wterl_cursor_update(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_cursor_value_ret(ErlNifEnv* env, WT_CURSOR *cursor, int rc); -static ERL_NIF_TERM wterl_session_checkpoint(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_session_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_session_create(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_session_delete(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_session_drop(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_session_get(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_session_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_session_put(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_session_rename(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_session_salvage(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_session_truncate(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_session_upgrade(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM wterl_session_verify(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); - -static ErlNifFunc nif_funcs[] = -{ - {"conn_close", 1, wterl_conn_close}, - {"conn_open", 2, wterl_conn_open}, - {"cursor_close", 1, wterl_cursor_close}, - {"cursor_insert", 3, wterl_cursor_insert}, - {"cursor_next", 1, wterl_cursor_next}, - {"cursor_next_key", 1, wterl_cursor_next_key}, - {"cursor_next_value", 1, wterl_cursor_next_value}, - {"cursor_open", 2, wterl_cursor_open}, - {"cursor_prev", 1, wterl_cursor_prev}, - {"cursor_prev_key", 1, wterl_cursor_prev_key}, - {"cursor_prev_value", 1, wterl_cursor_prev_value}, - {"cursor_remove", 2, wterl_cursor_remove}, - {"cursor_reset", 1, wterl_cursor_reset}, - {"cursor_search", 2, wterl_cursor_search}, - {"cursor_search_near", 2, wterl_cursor_search_near}, - {"cursor_update", 3, wterl_cursor_update}, - {"session_checkpoint", 2, wterl_session_checkpoint}, - {"session_close", 1, wterl_session_close}, - {"session_create", 3, wterl_session_create}, - {"session_delete", 3, wterl_session_delete}, - {"session_drop", 3, wterl_session_drop}, - {"session_get", 3, wterl_session_get}, - {"session_open", 2, wterl_session_open}, - {"session_put", 4, wterl_session_put}, - {"session_rename", 4, wterl_session_rename}, - {"session_salvage", 3, wterl_session_salvage}, - {"session_truncate", 3, wterl_session_truncate}, - {"session_upgrade", 3, wterl_session_upgrade}, - {"session_verify", 3, wterl_session_verify}, -}; static inline ERL_NIF_TERM wterl_strerror(ErlNifEnv* env, int rc) { @@ -129,352 +59,673 @@ static inline ERL_NIF_TERM wterl_strerror(ErlNifEnv* env, int rc) enif_make_string(env, wiredtiger_strerror(rc), ERL_NIF_LATIN1)); } -static ERL_NIF_TERM wterl_conn_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - ErlNifBinary config; +ASYNC_NIF_DECL( + wterl_conn_open, + { // struct + + ERL_NIF_TERM config; char homedir[4096]; - if (enif_get_string(env, argv[0], homedir, sizeof homedir, ERL_NIF_LATIN1) && - enif_inspect_binary(env, argv[1], &config)) - { - WT_CONNECTION* conn; - int rc = wiredtiger_open(homedir, NULL, (const char*)config.data, &conn); - if (rc == 0) - { - WterlConnHandle* conn_handle = enif_alloc_resource(wterl_conn_RESOURCE, sizeof(WterlConnHandle)); - conn_handle->conn = conn; - ERL_NIF_TERM result = enif_make_resource(env, conn_handle); - enif_release_resource(conn_handle); - return enif_make_tuple2(env, ATOM_OK, result); - } - else - { - return wterl_strerror(env, rc); - } + }, + { // pre + + if (!(argc == 2 && + enif_get_string(env, argv[0], args->homedir, sizeof args->homedir, ERL_NIF_LATIN1) && + enif_is_binary(env, argv[1]))) { + ASYNC_NIF_RETURN_BADARG(); } - return enif_make_badarg(env); -} + args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); + }, + { // work -static ERL_NIF_TERM wterl_conn_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - WterlConnHandle* conn_handle; - if (enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&conn_handle)) - { - WT_CONNECTION* conn = conn_handle->conn; - int rc = conn->close(conn, NULL); - return rc == 0 ? ATOM_OK : wterl_strerror(env, rc); - } - return enif_make_badarg(env); -} - -#define WTERL_OP_CREATE 1 -#define WTERL_OP_DROP 2 -#define WTERL_OP_SALVAGE 3 -#define WTERL_OP_TRUNCATE 4 -#define WTERL_OP_UPGRADE 5 -#define WTERL_OP_VERIFY 6 - -static inline ERL_NIF_TERM wterl_session_worker(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[], int op) -{ - WterlSessionHandle* session_handle; - if (enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&session_handle)) - { - WT_SESSION* session = session_handle->session; - int rc; - Uri uri; - ErlNifBinary config; - if (enif_get_string(env, argv[1], uri, sizeof uri, ERL_NIF_LATIN1) && - enif_inspect_binary(env, argv[2], &config)) - { - switch (op) - { - case WTERL_OP_CREATE: - rc = session->create(session, uri, (const char*)config.data); - break; - case WTERL_OP_DROP: - rc = session->drop(session, uri, (const char*)config.data); - break; - case WTERL_OP_SALVAGE: - rc = session->salvage(session, uri, (const char*)config.data); - break; - case WTERL_OP_TRUNCATE: - // Ignore the cursor start/stop form of truncation for now, - // support only the full file truncation. - rc = session->truncate(session, uri, NULL, NULL, (const char*)config.data); - break; - case WTERL_OP_UPGRADE: - rc = session->upgrade(session, uri, (const char*)config.data); - break; - case WTERL_OP_VERIFY: - default: - rc = session->verify(session, uri, (const char*)config.data); - break; - } - return rc == 0 ? ATOM_OK : wterl_strerror(env, rc); - } - } - return enif_make_badarg(env); -} - -static ERL_NIF_TERM wterl_session_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - WterlConnHandle* conn_handle; + WT_CONNECTION* conn; ErlNifBinary config; - if (enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&conn_handle) && - enif_inspect_binary(env, argv[1], &config)) - { - WT_CONNECTION* conn = conn_handle->conn; - WT_SESSION* session; - int rc = conn->open_session(conn, NULL, (const char *)config.data, &session); - if (rc == 0) - { - WterlSessionHandle* session_handle = - enif_alloc_resource(wterl_session_RESOURCE, sizeof(WterlSessionHandle)); - session_handle->session = session; - ERL_NIF_TERM result = enif_make_resource(env, session_handle); - enif_keep_resource(conn_handle); - enif_release_resource(session_handle); - return enif_make_tuple2(env, ATOM_OK, result); - } - else - { - return wterl_strerror(env, rc); - } + if (!enif_inspect_binary(env, args->config, &config)) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; } - return enif_make_badarg(env); -} -static ERL_NIF_TERM wterl_session_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - WterlSessionHandle* session_handle; - if (enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&session_handle)) + int rc = wiredtiger_open(args->homedir, NULL, (const char*)config.data, &conn); + if (rc == 0) { - WT_SESSION* session = session_handle->session; - int rc = session->close(session, NULL); - return rc == 0 ? ATOM_OK : wterl_strerror(env, rc); + WterlConnHandle* conn_handle = enif_alloc_resource(wterl_conn_RESOURCE, sizeof(WterlConnHandle)); + conn_handle->conn = conn; + ERL_NIF_TERM result = enif_make_resource(env, conn_handle); + enif_release_resource(conn_handle); + ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result)); } - return enif_make_badarg(env); -} - -static ERL_NIF_TERM wterl_session_create(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - return wterl_session_worker(env, argc, argv, WTERL_OP_CREATE); -} - -static ERL_NIF_TERM wterl_session_drop(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - return wterl_session_worker(env, argc, argv, WTERL_OP_DROP); -} - -static ERL_NIF_TERM wterl_session_rename(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - WterlSessionHandle* session_handle; - if (enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&session_handle)) + else { - ErlNifBinary config; - Uri oldname, newname; - if (enif_get_string(env, argv[1], oldname, sizeof oldname, ERL_NIF_LATIN1) && - enif_get_string(env, argv[2], newname, sizeof newname, ERL_NIF_LATIN1) && - enif_inspect_binary(env, argv[3], &config)) - { - WT_SESSION* session = session_handle->session; - int rc = session->rename(session, oldname, newname, (const char*)config.data); - return rc == 0 ? ATOM_OK : wterl_strerror(env, rc); - } + ASYNC_NIF_REPLY(wterl_strerror(env, rc)); } - return enif_make_badarg(env); -} + }, + { // post -static ERL_NIF_TERM wterl_session_salvage(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - return wterl_session_worker(env, argc, argv, WTERL_OP_SALVAGE); -} + }); -static ERL_NIF_TERM wterl_session_checkpoint(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - WterlSessionHandle* session_handle; +ASYNC_NIF_DECL( + wterl_conn_close, + { // struct + + WterlConnHandle* conn_handle; + }, + { // pre + + if (!(argc == 1 && + enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle))) { + ASYNC_NIF_RETURN_BADARG(); + } + enif_keep_resource((void*)args->conn_handle); + }, + { // work + + WT_CONNECTION* conn = args->conn_handle->conn; + int rc = conn->close(conn, NULL); + ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); + }, + { // post + + enif_release_resource((void*)args->conn_handle); + }); + +ASYNC_NIF_DECL( + wterl_session_open, + { // struct + + ERL_NIF_TERM config; + WterlConnHandle* conn_handle; + }, + { // pre + + if (!(argc == 2 && + enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && + enif_is_binary(env, argv[1]))) { + ASYNC_NIF_RETURN_BADARG(); + } + args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); + enif_keep_resource((void*)args->conn_handle); + }, + { // work + + WT_CONNECTION* conn = args->conn_handle->conn; + WT_SESSION* session; ErlNifBinary config; - if (enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&session_handle) && - enif_inspect_binary(env, argv[1], &config)) - { - WT_SESSION* session = session_handle->session; - int rc = session->checkpoint(session, (const char*)config.data); - return rc == 0 ? ATOM_OK : wterl_strerror(env, rc); + if (!enif_inspect_binary(env, args->config, &config)) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; } - return enif_make_badarg(env); -} + int rc = conn->open_session(conn, NULL, (const char*)config.data, &session); + if (rc == 0) + { + WterlSessionHandle* session_handle = + enif_alloc_resource(wterl_session_RESOURCE, sizeof(WterlSessionHandle)); + session_handle->session = session; + ERL_NIF_TERM result = enif_make_resource(env, session_handle); + enif_keep_resource(args->conn_handle); + enif_release_resource(session_handle); + ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result)); + } + else + { + ASYNC_NIF_REPLY(wterl_strerror(env, rc)); + } + }, + { // post -static ERL_NIF_TERM wterl_session_truncate(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - return wterl_session_worker(env, argc, argv, WTERL_OP_TRUNCATE); -} + enif_release_resource((void*)args->conn_handle); + }); -static ERL_NIF_TERM wterl_session_upgrade(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - return wterl_session_worker(env, argc, argv, WTERL_OP_UPGRADE); -} +ASYNC_NIF_DECL( + wterl_session_close, + { // struct -static ERL_NIF_TERM wterl_session_verify(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - return wterl_session_worker(env, argc, argv, WTERL_OP_VERIFY); -} - -static ERL_NIF_TERM wterl_session_delete(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ WterlSessionHandle* session_handle; - if (enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&session_handle)) - { - Uri uri; - ErlNifBinary key; - if (enif_get_string(env, argv[1], uri, sizeof uri, ERL_NIF_LATIN1) && - enif_inspect_binary(env, argv[2], &key)) - { - WT_SESSION* session = session_handle->session; - WT_CURSOR* cursor; - int rc = session->open_cursor(session, uri, NULL, "raw", &cursor); - if (rc != 0) - { - return wterl_strerror(env, rc); - } - WT_ITEM raw_key; - raw_key.data = key.data; - raw_key.size = key.size; - cursor->set_key(cursor, &raw_key); - rc = cursor->remove(cursor); - cursor->close(cursor); - return rc == 0 ? ATOM_OK : wterl_strerror(env, rc); - } - } - return enif_make_badarg(env); -} + }, + { // pre + + if (!(argc == 1 && + enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&args->session_handle))) { + ASYNC_NIF_RETURN_BADARG(); + } + enif_keep_resource((void*)args->session_handle); + }, + { // work + + WT_SESSION* session = args->session_handle->session; + int rc = session->close(session, NULL); + ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); + }, + { // post + + enif_release_resource((void*)args->session_handle); + }); + +ASYNC_NIF_DECL( + wterl_session_create, + { // struct -static ERL_NIF_TERM wterl_session_get(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ WterlSessionHandle* session_handle; - if (enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&session_handle)) - { - Uri uri; - ErlNifBinary key; - if (enif_get_string(env, argv[1], uri, sizeof uri, ERL_NIF_LATIN1) && - enif_inspect_binary(env, argv[2], &key)) - { - WT_SESSION* session = session_handle->session; - WT_CURSOR* cursor; - int rc = session->open_cursor(session, uri, NULL, "overwrite,raw", &cursor); - if (rc != 0) - { - return wterl_strerror(env, rc); - } - WT_ITEM raw_key, raw_value; - raw_key.data = key.data; - raw_key.size = key.size; - cursor->set_key(cursor, &raw_key); - rc = cursor->search(cursor); - if (rc == 0) - { - rc = cursor->get_value(cursor, &raw_value); - if (rc == 0) - { - ERL_NIF_TERM value; - unsigned char* bin = enif_make_new_binary(env, raw_value.size, &value); - memcpy(bin, raw_value.data, raw_value.size); - cursor->close(cursor); - return enif_make_tuple2(env, ATOM_OK, value); - } - } - cursor->close(cursor); - return wterl_strerror(env, rc); - } - } - return enif_make_badarg(env); -} + Uri uri; + ERL_NIF_TERM config; + }, + { // pre + + if (!(argc == 3 && + enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&args->session_handle) && + enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) && + enif_is_binary(env, argv[2]))) { + ASYNC_NIF_RETURN_BADARG(); + } + args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); + enif_keep_resource((void*)args->session_handle); + }, + { // work + + WT_SESSION* session = args->session_handle->session; + ErlNifBinary config; + if (!enif_inspect_binary(env, args->config, &config)) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; + } + int rc = session->create(session, args->uri, (const char*)config.data); + ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); + }, + { // post + + enif_release_resource((void*)args->session_handle); + }); + +ASYNC_NIF_DECL( + wterl_session_drop, + { // struct -static ERL_NIF_TERM wterl_session_put(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ WterlSessionHandle* session_handle; - if (enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&session_handle)) - { - Uri uri; - ErlNifBinary key, value; - if (enif_get_string(env, argv[1], uri, sizeof uri, ERL_NIF_LATIN1) && - enif_inspect_binary(env, argv[2], &key) && - enif_inspect_binary(env, argv[3], &value)) - { - WT_SESSION* session = session_handle->session; - WT_CURSOR* cursor; - int rc = session->open_cursor(session, uri, NULL, "overwrite,raw", &cursor); - if (rc != 0) - { - return wterl_strerror(env, rc); - } - WT_ITEM raw_key, raw_value; - raw_key.data = key.data; - raw_key.size = key.size; - cursor->set_key(cursor, &raw_key); - raw_value.data = value.data; - raw_value.size = value.size; - cursor->set_value(cursor, &raw_value); - rc = cursor->insert(cursor); - cursor->close(cursor); - return rc == 0 ? ATOM_OK : wterl_strerror(env, rc); - } - } - return enif_make_badarg(env); -} + Uri uri; + ERL_NIF_TERM config; + }, + { // pre + + if (!(argc == 3 && + enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&args->session_handle) && + enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) && + enif_is_binary(env, argv[2]))) { + ASYNC_NIF_RETURN_BADARG(); + } + args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); + enif_keep_resource((void*)args->session_handle); + }, + { // work + + WT_SESSION* session = args->session_handle->session; + ErlNifBinary config; + if (!enif_inspect_binary(env, args->config, &config)) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; + } + int rc = session->drop(session, args->uri, (const char*)config.data); + ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); + }, + { // post + + enif_release_resource((void*)args->session_handle); + }); + +ASYNC_NIF_DECL( + wterl_session_rename, + { // struct -static ERL_NIF_TERM wterl_cursor_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ WterlSessionHandle* session_handle; - if (enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&session_handle)) - { - WT_CURSOR* cursor; - Uri uri; - if (enif_get_string(env, argv[1], uri, sizeof uri, ERL_NIF_LATIN1)) - { - WT_SESSION* session = session_handle->session; - int rc = session->open_cursor(session, uri, NULL, "overwrite,raw", &cursor); - if (rc == 0) - { - WterlCursorHandle* cursor_handle = - enif_alloc_resource(wterl_cursor_RESOURCE, sizeof(WterlCursorHandle)); - cursor_handle->cursor = cursor; - ERL_NIF_TERM result = enif_make_resource(env, cursor_handle); - enif_keep_resource(session_handle); - enif_release_resource(cursor_handle); - return enif_make_tuple2(env, ATOM_OK, result); - } - else - { - return wterl_strerror(env, rc); - } - } - } - return enif_make_badarg(env); -} + ERL_NIF_TERM config; + Uri oldname; + Uri newname; + }, + { // pre + + if (!(argc == 4 && + enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&args->session_handle) && + enif_get_string(env, argv[1], args->oldname, sizeof args->oldname, ERL_NIF_LATIN1) && + enif_get_string(env, argv[2], args->newname, sizeof args->newname, ERL_NIF_LATIN1) && + enif_is_binary(env, argv[3]))) { + ASYNC_NIF_RETURN_BADARG(); + } + args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[3]); + enif_keep_resource((void*)args->session_handle); + }, + { // work + + WT_SESSION* session = args->session_handle->session; + ErlNifBinary config; + if (!enif_inspect_binary(env, args->config, &config)) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; + } + int rc = session->rename(session, args->oldname, args->newname, (const char*)config.data); + ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); + }, + { // post + + enif_release_resource((void*)args->session_handle); + }); + +ASYNC_NIF_DECL( + wterl_session_salvage, + { // struct + + WterlSessionHandle* session_handle; + Uri uri; + ERL_NIF_TERM config; + }, + { // pre + + if (!(argc == 3 && + enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&args->session_handle) && + enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) && + enif_is_binary(env, argv[2]))) { + ASYNC_NIF_RETURN_BADARG(); + } + args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); + enif_keep_resource((void*)args->session_handle); + }, + { // work + + WT_SESSION* session = args->session_handle->session; + ErlNifBinary config; + if (!enif_inspect_binary(env, args->config, &config)) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; + } + int rc = session->salvage(session, args->uri, (const char*)config.data); + ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); + }, + { // post + + enif_release_resource((void*)args->session_handle); + }); + +ASYNC_NIF_DECL( + wterl_session_checkpoint, + { // struct + + WterlSessionHandle* session_handle; + ERL_NIF_TERM config; + }, + { // pre + + if (!(argc == 2 && + enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&args->session_handle) && + enif_is_binary(env, argv[1]))) { + ASYNC_NIF_RETURN_BADARG(); + } + args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); + enif_keep_resource((void*)args->session_handle); + }, + { // work + + WT_SESSION* session = args->session_handle->session; + ErlNifBinary config; + if (!enif_inspect_binary(env, args->config, &config)) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; + } + int rc = session->checkpoint(session, (const char*)config.data); + ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); + }, + { // post + + enif_release_resource((void*)args->session_handle); + }); + +ASYNC_NIF_DECL( + wterl_session_truncate, + { // struct + + WterlSessionHandle* session_handle; + Uri uri; + ERL_NIF_TERM config; + }, + { // pre + + if (!(argc == 3 && + enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&args->session_handle) && + enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) && + enif_is_binary(env, argv[2]))) { + ASYNC_NIF_RETURN_BADARG(); + } + args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); + enif_keep_resource((void*)args->session_handle); + }, + { // work + + // Ignore the cursor start/stop form of truncation for now, + // support only the full file truncation. + WT_SESSION* session = args->session_handle->session; + ErlNifBinary config; + if (!enif_inspect_binary(env, args->config, &config)) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; + } + int rc = session->truncate(session, args->uri, NULL, NULL, (const char*)config.data); + ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); + }, + { // post + + enif_release_resource((void*)args->session_handle); + }); + +ASYNC_NIF_DECL( + wterl_session_upgrade, + { // struct + + WterlSessionHandle* session_handle; + Uri uri; + ERL_NIF_TERM config; + }, + { // pre + + if (!(argc == 3 && + enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&args->session_handle) && + enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) && + enif_is_binary(env, argv[2]))) { + ASYNC_NIF_RETURN_BADARG(); + } + args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); + enif_keep_resource((void*)args->session_handle); + }, + { // work + + WT_SESSION* session = args->session_handle->session; + ErlNifBinary config; + if (!enif_inspect_binary(env, args->config, &config)) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; + } + int rc = session->upgrade(session, args->uri, (const char*)config.data); + ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); + }, + { // post + + enif_release_resource((void*)args->session_handle); + }); + +ASYNC_NIF_DECL( + wterl_session_verify, + { // struct + + WterlSessionHandle* session_handle; + Uri uri; + ERL_NIF_TERM config; + }, + { // pre + + if (!(argc == 3 && + enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&args->session_handle) && + enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) && + enif_is_binary(env, argv[2]))) { + ASYNC_NIF_RETURN_BADARG(); + } + args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); + enif_keep_resource((void*)args->session_handle); + }, + { // work + + WT_SESSION* session = args->session_handle->session; + ErlNifBinary config; + if (!enif_inspect_binary(env, args->config, &config)) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; + } + int rc = session->verify(session, args->uri, (const char*)config.data); + ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); + }, + { // post + + enif_release_resource((void*)args->session_handle); + }); + +ASYNC_NIF_DECL( + wterl_session_delete, + { // struct + + WterlSessionHandle* session_handle; + Uri uri; + ERL_NIF_TERM key; + }, + { // pre + + if (!(argc == 3 && + enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&args->session_handle) && + enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) && + enif_is_binary(env, argv[2]))) { + ASYNC_NIF_RETURN_BADARG(); + } + args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); + enif_keep_resource((void*)args->session_handle); + }, + { // work + + WT_SESSION* session = args->session_handle->session; + ErlNifBinary key; + if (!enif_inspect_binary(env, args->key, &key)) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; + } + WT_CURSOR* cursor; + int rc = session->open_cursor(session, args->uri, NULL, "raw", &cursor); + if (rc != 0) + { + ASYNC_NIF_REPLY(wterl_strerror(env, rc)); + return; + } + WT_ITEM raw_key; + raw_key.data = key.data; + raw_key.size = key.size; + cursor->set_key(cursor, &raw_key); + rc = cursor->remove(cursor); + cursor->close(cursor); + ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); + }, + { // post + + enif_release_resource((void*)args->session_handle); + }); + +ASYNC_NIF_DECL( + wterl_session_get, + { // struct + + WterlSessionHandle* session_handle; + Uri uri; + ERL_NIF_TERM key; + }, + { // pre + + if (!(argc == 3 && + enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&args->session_handle) && + enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) && + enif_is_binary(env, argv[2]))) { + ASYNC_NIF_RETURN_BADARG(); + } + args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); + enif_keep_resource((void*)args->session_handle); + }, + { // work + + WT_SESSION* session = args->session_handle->session; + ErlNifBinary key; + if (!enif_inspect_binary(env, args->key, &key)) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; + } + WT_CURSOR* cursor; + int rc = session->open_cursor(session, args->uri, NULL, "overwrite,raw", &cursor); + if (rc != 0) + { + ASYNC_NIF_REPLY(wterl_strerror(env, rc)); + return; + } + WT_ITEM raw_key; + WT_ITEM raw_value; + raw_key.data = key.data; + raw_key.size = key.size; + cursor->set_key(cursor, &raw_key); + rc = cursor->search(cursor); + if (rc == 0) + { + rc = cursor->get_value(cursor, &raw_value); + if (rc == 0) + { + ERL_NIF_TERM value; + unsigned char* bin = enif_make_new_binary(env, raw_value.size, &value); + memcpy(bin, raw_value.data, raw_value.size); + cursor->close(cursor); + ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, value)); + return; + } + } + cursor->close(cursor); + ASYNC_NIF_REPLY(wterl_strerror(env, rc)); + }, + { // post + + enif_release_resource((void*)args->session_handle); + }); + +ASYNC_NIF_DECL( + wterl_session_put, + { // struct + + WterlSessionHandle* session_handle; + Uri uri; + ERL_NIF_TERM key; + ERL_NIF_TERM value; + }, + { // pre + + if (!(argc == 4 && + enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&args->session_handle) && + enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) && + enif_is_binary(env, argv[2]) && + enif_is_binary(env, argv[3]))) { + ASYNC_NIF_RETURN_BADARG(); + } + args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); + args->value = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[3]); + enif_keep_resource((void*)args->session_handle); + }, + { // work + + WT_SESSION* session = args->session_handle->session; + ErlNifBinary key; + ErlNifBinary value; + if (!enif_inspect_binary(env, args->key, &key)) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; + } + if (!enif_inspect_binary(env, args->value, &value)) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; + } + WT_CURSOR* cursor; + int rc = session->open_cursor(session, args->uri, NULL, "overwrite,raw", &cursor); + if (rc != 0) + { + ASYNC_NIF_REPLY(wterl_strerror(env, rc)); + return; + } + WT_ITEM raw_key; + WT_ITEM raw_value; + raw_key.data = key.data; + raw_key.size = key.size; + cursor->set_key(cursor, &raw_key); + raw_value.data = value.data; + raw_value.size = value.size; + cursor->set_value(cursor, &raw_value); + rc = cursor->insert(cursor); + cursor->close(cursor); + ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); + }, + { // post + + enif_release_resource((void*)args->session_handle); + }); + +ASYNC_NIF_DECL( + wterl_cursor_open, + { // struct + + WterlSessionHandle* session_handle; + Uri uri; + }, + { // pre + + if (!(argc == 2 && + enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&args->session_handle) && + enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1))) { + ASYNC_NIF_RETURN_BADARG(); + } + enif_keep_resource((void*)args->session_handle); + }, + { // work + + WT_SESSION* session = args->session_handle->session; + WT_CURSOR* cursor; + int rc = session->open_cursor(session, args->uri, NULL, "overwrite,raw", &cursor); + if (rc == 0) + { + WterlCursorHandle* cursor_handle = + enif_alloc_resource(wterl_cursor_RESOURCE, sizeof(WterlCursorHandle)); + cursor_handle->cursor = cursor; + ERL_NIF_TERM result = enif_make_resource(env, cursor_handle); + enif_keep_resource(args->session_handle); + enif_release_resource(cursor_handle); + ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result)); + } + else + { + ASYNC_NIF_REPLY(wterl_strerror(env, rc)); + } + }, + { // post + + enif_release_resource((void*)args->session_handle); + }); + +ASYNC_NIF_DECL( + wterl_cursor_close, + { // struct -static ERL_NIF_TERM wterl_cursor_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ WterlCursorHandle *cursor_handle; - if (enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&cursor_handle)) - { - WT_CURSOR* cursor = cursor_handle->cursor; - int rc = cursor->close(cursor); - return rc == 0 ? ATOM_OK : wterl_strerror(env, rc); + }, + { // pre + + if (!(argc == 1 && + enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle))) { + ASYNC_NIF_RETURN_BADARG(); } - return enif_make_badarg(env); -} + enif_keep_resource((void*)args->cursor_handle); + }, + { // work + + WT_CURSOR* cursor = args->cursor_handle->cursor; + int rc = cursor->close(cursor); + ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); + }, + { // post + + enif_release_resource((void*)args->cursor_handle); + }); static ERL_NIF_TERM wterl_cursor_key_ret(ErlNifEnv* env, WT_CURSOR *cursor, int rc) { if (rc == 0) { - WT_ITEM raw_key; - rc = cursor->get_key(cursor, &raw_key); - if (rc == 0) + WT_ITEM raw_key; + rc = cursor->get_key(cursor, &raw_key); + if (rc == 0) { - ERL_NIF_TERM key; - memcpy(enif_make_new_binary(env, raw_key.size, &key), raw_key.data, raw_key.size); - return enif_make_tuple2(env, ATOM_OK, key); - } + ERL_NIF_TERM key; + memcpy(enif_make_new_binary(env, raw_key.size, &key), raw_key.data, raw_key.size); + return enif_make_tuple2(env, ATOM_OK, key); + } } return wterl_strerror(env, rc); } @@ -484,16 +735,16 @@ static ERL_NIF_TERM wterl_cursor_kv_ret(ErlNifEnv* env, WT_CURSOR *cursor, int r if (rc == 0) { WT_ITEM raw_key, raw_value; - rc = cursor->get_key(cursor, &raw_key); - if (rc == 0) + rc = cursor->get_key(cursor, &raw_key); + if (rc == 0) { rc = cursor->get_value(cursor, &raw_value); if (rc == 0) { - ERL_NIF_TERM key, value; - memcpy(enif_make_new_binary(env, raw_key.size, &key), raw_key.data, raw_key.size); - memcpy(enif_make_new_binary(env, raw_value.size, &value), raw_value.data, raw_value.size); - return enif_make_tuple3(env, ATOM_OK, key, value); + ERL_NIF_TERM key, value; + memcpy(enif_make_new_binary(env, raw_key.size, &key), raw_key.data, raw_key.size); + memcpy(enif_make_new_binary(env, raw_value.size, &value), raw_value.data, raw_value.size); + return enif_make_tuple3(env, ATOM_OK, key, value); } } } @@ -504,164 +755,407 @@ static ERL_NIF_TERM wterl_cursor_value_ret(ErlNifEnv* env, WT_CURSOR *cursor, in { if (rc == 0) { - WT_ITEM raw_value; - rc = cursor->get_value(cursor, &raw_value); - if (rc == 0) + WT_ITEM raw_value; + rc = cursor->get_value(cursor, &raw_value); + if (rc == 0) { - ERL_NIF_TERM value; - memcpy(enif_make_new_binary(env, raw_value.size, &value), raw_value.data, raw_value.size); - return enif_make_tuple2(env, ATOM_OK, value); - } + ERL_NIF_TERM value; + memcpy(enif_make_new_binary(env, raw_value.size, &value), raw_value.data, raw_value.size); + return enif_make_tuple2(env, ATOM_OK, value); + } } return wterl_strerror(env, rc); } -static ERL_NIF_TERM wterl_cursor_np_worker(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[], - CursorRetFun cursor_ret, int prev) -{ +ASYNC_NIF_DECL( + wterl_cursor_next, + { // struct + WterlCursorHandle *cursor_handle; - if (enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&cursor_handle)) - { - WT_CURSOR* cursor = cursor_handle->cursor; - return cursor_ret(env, cursor, prev == 0 ? cursor->next(cursor) : cursor->prev(cursor)); + }, + { // pre + + if (!(argc == 1 && + enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle))) { + ASYNC_NIF_RETURN_BADARG(); } - return enif_make_badarg(env); -} + enif_keep_resource((void*)args->cursor_handle); + }, + { // work -static ERL_NIF_TERM wterl_cursor_next(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - return wterl_cursor_np_worker(env, argc, argv, wterl_cursor_kv_ret, 0); -} + WT_CURSOR* cursor = args->cursor_handle->cursor; + ASYNC_NIF_REPLY(wterl_cursor_kv_ret(env, cursor, cursor->next(cursor))); + }, + { // post -static ERL_NIF_TERM wterl_cursor_next_key(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - return wterl_cursor_np_worker(env, argc, argv, wterl_cursor_key_ret, 0); -} + enif_release_resource((void*)args->cursor_handle); + }); -static ERL_NIF_TERM wterl_cursor_next_value(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - return wterl_cursor_np_worker(env, argc, argv, wterl_cursor_value_ret, 0); -} +ASYNC_NIF_DECL( + wterl_cursor_next_key, + { // struct -static ERL_NIF_TERM wterl_cursor_prev(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - return wterl_cursor_np_worker(env, argc, argv, wterl_cursor_kv_ret, 1); -} - -static ERL_NIF_TERM wterl_cursor_prev_key(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - return wterl_cursor_np_worker(env, argc, argv, wterl_cursor_key_ret, 1); -} - -static ERL_NIF_TERM wterl_cursor_prev_value(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - return wterl_cursor_np_worker(env, argc, argv, wterl_cursor_value_ret, 1); -} - -static ERL_NIF_TERM wterl_cursor_search_worker(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[], int near) -{ WterlCursorHandle *cursor_handle; + }, + { // pre + + if (!(enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle))) { + ASYNC_NIF_RETURN_BADARG(); + } + enif_keep_resource((void*)args->cursor_handle); + }, + { // work + + WT_CURSOR* cursor = args->cursor_handle->cursor; + ASYNC_NIF_REPLY(wterl_cursor_key_ret(env, cursor, cursor->next(cursor))); + }, + { // post + + enif_release_resource((void*)args->cursor_handle); + }); + +ASYNC_NIF_DECL( + wterl_cursor_next_value, + { // struct + + WterlCursorHandle *cursor_handle; + }, + { // pre + + if (!(argc == 1 && + enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle))) { + ASYNC_NIF_RETURN_BADARG(); + } + enif_keep_resource((void*)args->cursor_handle); + }, + { // work + + WT_CURSOR* cursor = args->cursor_handle->cursor; + ASYNC_NIF_REPLY(wterl_cursor_value_ret(env, cursor, cursor->next(cursor))); + }, + { // post + + enif_release_resource((void*)args->cursor_handle); + }); + +ASYNC_NIF_DECL( + wterl_cursor_prev, + { // struct + + WterlCursorHandle *cursor_handle; + }, + { // pre + + if (!(argc == 1 && + enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle))) { + ASYNC_NIF_RETURN_BADARG(); + } + enif_keep_resource((void*)args->cursor_handle); + }, + { // work + + WT_CURSOR* cursor = args->cursor_handle->cursor; + ASYNC_NIF_REPLY(wterl_cursor_kv_ret(env, cursor, cursor->prev(cursor))); + }, + { // post + + enif_release_resource((void*)args->cursor_handle); + }); + +ASYNC_NIF_DECL( + wterl_cursor_prev_key, + { // struct + + WterlCursorHandle *cursor_handle; + }, + { // pre + + if (!(argc == 1 && + enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle))) { + ASYNC_NIF_RETURN_BADARG(); + } + enif_keep_resource((void*)args->cursor_handle); + }, + { // work + + WT_CURSOR* cursor = args->cursor_handle->cursor; + ASYNC_NIF_REPLY(wterl_cursor_key_ret(env, cursor, cursor->prev(cursor))); + }, + { // post + + enif_release_resource((void*)args->cursor_handle); + }); + +ASYNC_NIF_DECL( + wterl_cursor_prev_value, + { // struct + + WterlCursorHandle *cursor_handle; + }, + { // pre + + if (!(argc == 1 && + enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle))) { + ASYNC_NIF_RETURN_BADARG(); + } + enif_keep_resource((void*)args->cursor_handle); + }, + { // work + + WT_CURSOR* cursor = args->cursor_handle->cursor; + ASYNC_NIF_REPLY(wterl_cursor_value_ret(env, cursor, cursor->prev(cursor))); + }, + { // post + + enif_release_resource((void*)args->cursor_handle); + }); + +ASYNC_NIF_DECL( + wterl_cursor_search, + { // struct + + WterlCursorHandle *cursor_handle; + ERL_NIF_TERM key; + }, + { // pre + + if (!(argc == 2 && + enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle) && + enif_is_binary(env, argv[1]))) { + ASYNC_NIF_RETURN_BADARG(); + } + args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); + enif_keep_resource((void*)args->cursor_handle); + }, + { // work + + WT_CURSOR* cursor = args->cursor_handle->cursor; ErlNifBinary key; - if (enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&cursor_handle) && - enif_inspect_binary(env, argv[1], &key)) - { - WT_CURSOR* cursor = cursor_handle->cursor; - WT_ITEM raw_key; - int exact; - raw_key.data = key.data; - raw_key.size = key.size; - cursor->set_key(cursor, &raw_key); - - // We currently ignore the less-than, greater-than or equals-to return information - // from the cursor.search_near method. - return wterl_cursor_value_ret(env, cursor, - near == 1 ? - cursor->search_near(cursor, &exact) : cursor->search(cursor)); + if (!enif_inspect_binary(env, args->key, &key)) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; } - return enif_make_badarg(env); -} + WT_ITEM raw_key; -static ERL_NIF_TERM wterl_cursor_search(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - return wterl_cursor_search_worker(env, argc, argv, 0); -} + raw_key.data = key.data; + raw_key.size = key.size; + cursor->set_key(cursor, &raw_key); -static ERL_NIF_TERM wterl_cursor_search_near(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - return wterl_cursor_search_worker(env, argc, argv, 1); -} + // We currently ignore the less-than, greater-than or equals-to return information + // from the cursor.search_near method. + ASYNC_NIF_REPLY(wterl_cursor_value_ret(env, cursor, cursor->search(cursor))); + }, + { // post + + enif_release_resource((void*)args->cursor_handle); + }); + +ASYNC_NIF_DECL( + wterl_cursor_search_near, + { // struct -static ERL_NIF_TERM wterl_cursor_reset(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ WterlCursorHandle *cursor_handle; - if (enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&cursor_handle)) - { - WT_CURSOR* cursor = cursor_handle->cursor; - int rc = cursor->reset(cursor); - return rc == 0 ? ATOM_OK : wterl_strerror(env, rc); + ERL_NIF_TERM key; + }, + { // pre + + if (!(argc == 2 && + enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle) && + enif_is_binary(env, argv[1]))) { + ASYNC_NIF_RETURN_BADARG(); } - return enif_make_badarg(env); -} + args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); + enif_keep_resource((void*)args->cursor_handle); + }, + { // work -#define WTERL_OP_CURSOR_INSERT 1 -#define WTERL_OP_CURSOR_UPDATE 2 -#define WTERL_OP_CURSOR_REMOVE 3 + WT_CURSOR* cursor = args->cursor_handle->cursor; + ErlNifBinary key; + if (!enif_inspect_binary(env, args->key, &key)) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; + } + WT_ITEM raw_key; + int exact; + + raw_key.data = key.data; + raw_key.size = key.size; + cursor->set_key(cursor, &raw_key); + + // We currently ignore the less-than, greater-than or equals-to return information + // from the cursor.search_near method. + ASYNC_NIF_REPLY(wterl_cursor_value_ret(env, cursor, cursor->search_near(cursor, &exact))); + }, + { // post + + enif_release_resource((void*)args->cursor_handle); + }); + +ASYNC_NIF_DECL( + wterl_cursor_reset, + { // struct -static inline ERL_NIF_TERM wterl_cursor_data_op(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[], int op) -{ WterlCursorHandle *cursor_handle; - if (enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&cursor_handle)) - { - ErlNifBinary key, value; - int rc; + }, + { // pre - if (enif_inspect_binary(env, argv[1], &key) && - (op == WTERL_OP_CURSOR_REMOVE ? 1 : enif_inspect_binary(env, argv[2], &value))) - { - WT_CURSOR* cursor = cursor_handle->cursor; - WT_ITEM raw_key, raw_value; - raw_key.data = key.data; - raw_key.size = key.size; - cursor->set_key(cursor, &raw_key); - if (op != WTERL_OP_CURSOR_REMOVE) - { - raw_value.data = value.data; - raw_value.size = value.size; - cursor->set_value(cursor, &raw_value); - } - switch (op) - { - case WTERL_OP_CURSOR_INSERT: - rc = cursor->insert(cursor); - break; - case WTERL_OP_CURSOR_UPDATE: - rc = cursor->update(cursor); - break; - case WTERL_OP_CURSOR_REMOVE: - default: - rc = cursor->remove(cursor); - break; - } - return rc == 0 ? ATOM_OK : wterl_strerror(env, rc); - } + if (!(argc == 1 && + enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle))) { + ASYNC_NIF_RETURN_BADARG(); } - return enif_make_badarg(env); -} + enif_keep_resource((void*)args->cursor_handle); + }, + { // work -static ERL_NIF_TERM wterl_cursor_insert(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - return wterl_cursor_data_op(env, argc, argv, WTERL_OP_CURSOR_INSERT); -} + WT_CURSOR* cursor = args->cursor_handle->cursor; + int rc = cursor->reset(cursor); + ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); + }, + { // post -static ERL_NIF_TERM wterl_cursor_update(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - return wterl_cursor_data_op(env, argc, argv, WTERL_OP_CURSOR_UPDATE); -} + enif_release_resource((void*)args->cursor_handle); + }); -static ERL_NIF_TERM wterl_cursor_remove(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) -{ - return wterl_cursor_data_op(env, argc, argv, WTERL_OP_CURSOR_REMOVE); -} +ASYNC_NIF_DECL( + wterl_cursor_insert, + { // struct + + WterlCursorHandle *cursor_handle; + ERL_NIF_TERM key; + ERL_NIF_TERM value; + int rc; + }, + { // pre + + if (!(argc == 3 && + enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle) && + enif_is_binary(env, argv[1]) && + enif_is_binary(env, argv[2]))) { + ASYNC_NIF_RETURN_BADARG(); + } + args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); + args->value = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); + enif_keep_resource((void*)args->cursor_handle); + }, + { // work + + WT_CURSOR* cursor = args->cursor_handle->cursor; + ErlNifBinary key; + ErlNifBinary value; + if (!enif_inspect_binary(env, args->key, &key)) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; + } + if (!enif_inspect_binary(env, args->value, &value)) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; + } + WT_ITEM raw_key; + WT_ITEM raw_value; + + raw_key.data = key.data; + raw_key.size = key.size; + cursor->set_key(cursor, &raw_key); + raw_value.data = value.data; + raw_value.size = value.size; + cursor->set_value(cursor, &raw_value); + int rc = cursor->insert(cursor); + ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); + }, + { // post + + enif_release_resource((void*)args->cursor_handle); + }); + +ASYNC_NIF_DECL( + wterl_cursor_update, + { // struct + + WterlCursorHandle *cursor_handle; + ERL_NIF_TERM key; + ERL_NIF_TERM value; + int rc; + }, + { // pre + + if (!(argc == 3 && + enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle) && + enif_is_binary(env, argv[1]) && + enif_is_binary(env, argv[2]))) { + ASYNC_NIF_RETURN_BADARG(); + } + args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); + args->value = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); + enif_keep_resource((void*)args->cursor_handle); + }, + { // work + + WT_CURSOR* cursor = args->cursor_handle->cursor; + ErlNifBinary key; + ErlNifBinary value; + if (!enif_inspect_binary(env, args->key, &key)) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; + } + if (!enif_inspect_binary(env, args->value, &value)) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; + } + WT_ITEM raw_key; + WT_ITEM raw_value; + + raw_key.data = key.data; + raw_key.size = key.size; + cursor->set_key(cursor, &raw_key); + raw_value.data = value.data; + raw_value.size = value.size; + cursor->set_value(cursor, &raw_value); + int rc = cursor->update(cursor); + ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); + }, + { // post + + enif_release_resource((void*)args->cursor_handle); + }); + +ASYNC_NIF_DECL( + wterl_cursor_remove, + { // struct + + WterlCursorHandle *cursor_handle; + ERL_NIF_TERM key; + int rc; + }, + { // pre + + if (!(argc == 2 && + enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle) && + enif_is_binary(env, argv[1]))) { + ASYNC_NIF_RETURN_BADARG(); + } + args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); + enif_keep_resource((void*)args->cursor_handle); + }, + { // work + + WT_CURSOR* cursor = args->cursor_handle->cursor; + ErlNifBinary key; + if (!enif_inspect_binary(env, args->key, &key)) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; + } + WT_ITEM raw_key; + + raw_key.data = key.data; + raw_key.size = key.size; + cursor->set_key(cursor, &raw_key); + int rc = cursor->remove(cursor); + ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); + }, + { // post + + enif_release_resource((void*)args->cursor_handle); + }); static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) { @@ -671,7 +1165,54 @@ static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) wterl_cursor_RESOURCE = enif_open_resource_type(env, NULL, "wterl_cursor_resource", NULL, flags, NULL); ATOM_ERROR = enif_make_atom(env, "error"); ATOM_OK = enif_make_atom(env, "ok"); + + ASYNC_NIF_LOAD(); + return 0; } -ERL_NIF_INIT(wterl, nif_funcs, &on_load, NULL, NULL, NULL); +static void on_unload(ErlNifEnv* env, void* priv_data) +{ + ASYNC_NIF_UNLOAD(); +} + +static int on_upgrade(ErlNifEnv* env, void** priv_data, void** old_priv_data, ERL_NIF_TERM load_info) +{ + ASYNC_NIF_UPGRADE(); + return 0; +} + +static ErlNifFunc nif_funcs[] = +{ + {"conn_open_nif", 3, wterl_conn_open}, + {"conn_close_nif", 2, wterl_conn_close}, + {"session_open_nif", 3, wterl_session_open}, + {"session_close_nif", 2, wterl_session_close}, + {"session_create_nif", 4, wterl_session_create}, + {"session_drop_nif", 4, wterl_session_drop}, + {"session_rename_nif", 5, wterl_session_rename}, + {"session_salvage_nif", 4, wterl_session_salvage}, + {"session_checkpoint_nif", 3, wterl_session_checkpoint}, + {"session_truncate_nif", 4, wterl_session_truncate}, + {"session_upgrade_nif", 4, wterl_session_upgrade}, + {"session_verify_nif", 4, wterl_session_verify}, + {"session_delete_nif", 4, wterl_session_delete}, + {"session_get_nif", 4, wterl_session_get}, + {"session_put_nif", 5, wterl_session_put}, + {"cursor_open_nif", 3, wterl_cursor_open}, + {"cursor_close_nif", 2, wterl_cursor_close}, + {"cursor_next_nif", 2, wterl_cursor_next}, + {"cursor_next_key_nif", 2, wterl_cursor_next_key}, + {"cursor_next_value_nif", 2, wterl_cursor_next_value}, + {"cursor_prev_nif", 2, wterl_cursor_prev}, + {"cursor_prev_key_nif", 2, wterl_cursor_prev_key}, + {"cursor_prev_value_nif", 2, wterl_cursor_prev_value}, + {"cursor_search_nif", 3, wterl_cursor_search}, + {"cursor_search_near_nif", 3, wterl_cursor_search_near}, + {"cursor_reset_nif", 2, wterl_cursor_reset}, + {"cursor_insert_nif", 4, wterl_cursor_insert}, + {"cursor_update_nif", 4, wterl_cursor_update}, + {"cursor_remove_nif", 3, wterl_cursor_remove}, +}; + +ERL_NIF_INIT(wterl, nif_funcs, &on_load, NULL, &on_upgrade, &on_unload); diff --git a/src/async_nif.hrl b/src/async_nif.hrl new file mode 100644 index 0000000..8eda423 --- /dev/null +++ b/src/async_nif.hrl @@ -0,0 +1,43 @@ +%% ------------------------------------------------------------------- +%% +%% async_nif: An async thread-pool layer for Erlang's NIF API +%% +%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. +%% Author: Gregory Burd +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-define(ASYNC_NIF_CALL(Fun, Args), + begin + NIFRef = erlang:make_ref(), + case erlang:apply(Fun, [NIFRef|Args]) of + {ok, {enqueued, QDepth}} -> + [erlang:bump_reductions(10 * QDepth) || is_integer(QDepth), QDepth > 100], + receive + {NIFRef, {error, shutdown}=Error} -> + %% Work unit was queued, but not executed. + Error; + {NIFRef, {error, _Reason}=Error} -> + %% Work unit returned an error. + Error; + {NIFRef, Reply} -> + Reply + end; + Other -> + Other + end + end). diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index 744e3ba..8e92514 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -43,6 +43,7 @@ -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). +-compiel(export_all). -endif. -define(API_VERSION, 1). @@ -50,10 +51,13 @@ %%-define(CAPABILITIES, [async_fold, indexes]). -define(CAPABILITIES, [async_fold]). +-record(pass, {session :: wterl:session(), + cursor :: wterl:cursor()}). +-type pass() :: #pass{}. + -record(state, {table :: string(), connection :: wterl:connection(), - cursors :: ets:tid(), - session :: wterl:session()}). + passes :: [pass()]}). -type state() :: #state{}. -type config() :: [{atom(), term()}]. @@ -81,6 +85,7 @@ capabilities(_, _) -> %% @doc Start the wterl backend -spec start(integer(), config()) -> {ok, state()} | {error, term()}. start(Partition, Config) -> + lager:start(), AppStart = case application:start(wterl) of ok -> @@ -94,34 +99,30 @@ start(Partition, Config) -> case AppStart of ok -> Table = "lsm:wt" ++ integer_to_list(Partition), - establish_connection(Table, Config); + {ok, Connection} = establish_connection(Config), + Passes = establish_passes(erlang:system_info(schedulers), Connection, Table), + {ok, #state{table=Table, connection=Connection, passes=Passes}}; {error, Reason2} -> {error, Reason2} end. %% @doc Stop the wterl backend -spec stop(state()) -> ok. -stop(#state{session=undefined, cursors=undefined}) -> - ok; -stop(#state{session=Session, cursors=undefined}) -> - ok = wterl:session_close(Session); -stop(#state{session=Session, cursors=Cursors}=State) -> - ets:foldl(fun({_Table, Cursor}, _) -> - ok = wterl:cursor_close(Cursor) - end, true, Cursors), - ets:delete(Cursors), - stop(State#state{session=Session, cursors=undefined}). +stop(#state{passes=Passes}) -> + lists:foreach(fun({Session, Cursor}) -> + ok = wterl:cursor_close(Cursor), + ok = wterl:session_close(Session) + end, Passes), + ok. %% @doc Retrieve an object from the wterl backend -spec get(riak_object:bucket(), riak_object:key(), state()) -> {ok, any(), state()} | {ok, not_found, state()} | {error, term(), state()}. -get(Bucket, Key, #state{session=undefined}=State) -> - get(Bucket, Key, establish_session(State)); -get(Bucket, Key, #state{session=Session, table=Table}=State0) -> +get(Bucket, Key, #state{passes=Passes}=State) -> WTKey = to_object_key(Bucket, Key), - {Cursor, State} = cursor(Session, Table, get, State0), + {_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes), case wterl:cursor_search(Cursor, WTKey) of {ok, Value} -> {ok, Value, State}; @@ -139,11 +140,9 @@ get(Bucket, Key, #state{session=Session, table=Table}=State0) -> -spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) -> {ok, state()} | {error, term(), state()}. -put(Bucket, PrimaryKey, IndexSpecs, Val, #state{session=undefined}=State) -> - put(Bucket, PrimaryKey, IndexSpecs, Val, establish_session(State)); -put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{session=Session, table=Table}=State0) -> +put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{passes=Passes}=State) -> WTKey = to_object_key(Bucket, PrimaryKey), - {Cursor, State} = cursor(Session, Table, put, State0), + {_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes), case wterl:cursor_insert(Cursor, WTKey, Val) of ok -> {ok, State}; @@ -158,11 +157,9 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{session=Session, table=Table}=S -spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) -> {ok, state()} | {error, term(), state()}. -delete(Bucket, Key, IndexSpecs, #state{session=undefined}=State) -> - delete(Bucket, Key, IndexSpecs, establish_session(State)); -delete(Bucket, Key, _IndexSpecs, #state{session=Session, table=Table}=State0) -> +delete(Bucket, Key, _IndexSpecs, #state{passes=Passes}=State) -> WTKey = to_object_key(Bucket, Key), - {Cursor, State} = cursor(Session, Table, delete, State0), + {_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes), case wterl:cursor_remove(Cursor, WTKey) of ok -> {ok, State}; @@ -175,12 +172,12 @@ delete(Bucket, Key, _IndexSpecs, #state{session=Session, table=Table}=State0) -> any(), [], state()) -> {ok, any()} | {async, fun()}. -fold_buckets(FoldBucketsFun, Acc, Opts, #state{session=undefined}=State) -> - fold_buckets(FoldBucketsFun, Acc, Opts, establish_session(State)); -fold_buckets(FoldBucketsFun, Acc, Opts, #state{session=Session, table=Table}) -> +fold_buckets(FoldBucketsFun, Acc, Opts, #state{table=Table}) -> + {ok, Connection} = wterl_conn:get(), FoldFun = fold_buckets_fun(FoldBucketsFun), BucketFolder = fun() -> + {ok, Session} = wterl:session_open(Connection), case wterl:cursor_open(Session, Table) of {error, "No such file or directory"} -> Acc; @@ -193,7 +190,8 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{session=Session, table=Table}) -> {break, AccFinal} -> AccFinal after - ok = wterl:cursor_close(Cursor) + ok = wterl:cursor_close(Cursor), + ok = wterl:session_close(Session) end end end, @@ -209,9 +207,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{session=Session, table=Table}) -> any(), [{atom(), term()}], state()) -> {ok, term()} | {async, fun()}. -fold_keys(FoldKeysFun, Acc, Opts, #state{session=undefined}=State) -> - fold_keys(FoldKeysFun, Acc, Opts, establish_session(State)); -fold_keys(FoldKeysFun, Acc, Opts, #state{session=Session, table=Table}) -> +fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) -> %% Figure out how we should limit the fold: by bucket, by %% secondary index, or neither (fold across everything.) Bucket = lists:keyfind(bucket, 1, Opts), @@ -224,10 +220,13 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{session=Session, table=Table}) -> true -> undefined end, + {ok, Connection} = wterl_conn:get(), + %% Set up the fold... FoldFun = fold_keys_fun(FoldKeysFun, Limiter), KeyFolder = fun() -> + {ok, Session} = wterl:session_open(Connection), case wterl:cursor_open(Session, Table) of {error, "No such file or directory"} -> Acc; @@ -238,7 +237,8 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{session=Session, table=Table}) -> {break, AccFinal} -> AccFinal after - ok = wterl:cursor_close(Cursor) + ok = wterl:cursor_close(Cursor), + ok = wterl:session_close(Session) end end end, @@ -254,13 +254,13 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{session=Session, table=Table}) -> any(), [{atom(), term()}], state()) -> {ok, any()} | {async, fun()}. -fold_objects(FoldObjectsFun, Acc, Opts, #state{session=undefined}=State) -> - fold_objects(FoldObjectsFun, Acc, Opts, establish_session(State)); -fold_objects(FoldObjectsFun, Acc, Opts, #state{session=Session, table=Table}) -> +fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) -> + {ok, Connection} = wterl_conn:get(), Bucket = proplists:get_value(bucket, Opts), FoldFun = fold_objects_fun(FoldObjectsFun, Bucket), ObjectFolder = fun() -> + {ok, Session} = wterl:session_open(Connection), case wterl:cursor_open(Session, Table) of {error, "No such file or directory"} -> Acc; @@ -271,7 +271,8 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{session=Session, table=Table}) -> {break, AccFinal} -> AccFinal after - ok = wterl:cursor_close(Cursor) + ok = wterl:cursor_close(Cursor), + ok = wterl:session_close(Session) end end end, @@ -284,9 +285,8 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{session=Session, table=Table}) -> %% @doc Delete all objects from this wterl backend -spec drop(state()) -> {ok, state()} | {error, term(), state()}. -drop(#state{session=undefined}=State) -> - drop(establish_session(State)); -drop(#state{table=Table, session=Session}=State) -> +drop(#state{passes=Passes, table=Table}=State) -> + {Session, _Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes), case wterl:session_truncate(Session, Table) of ok -> {ok, State}; @@ -297,10 +297,9 @@ drop(#state{table=Table, session=Session}=State) -> %% @doc Returns true if this wterl backend contains any %% non-tombstone values; otherwise returns false. -spec is_empty(state()) -> boolean(). -is_empty(#state{session=undefined}=State) -> - is_empty(establish_session(State)); -is_empty(#state{table=Table, session=Session}) -> - {ok, Cursor} = wterl:cursor_open(Session, Table), +is_empty(#state{passes=Passes}) -> + {_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes), + wterl:cursor_reset(Cursor), try not_found =:= wterl:cursor_next(Cursor) after @@ -309,10 +308,8 @@ is_empty(#state{table=Table, session=Session}) -> %% @doc Get the status information for this wterl backend -spec status(state()) -> [{atom(), term()}]. -status(#state{session=undefined}=State) -> - status(establish_session(State)); -status(#state{table=Table, session=Session}) -> - {ok, Cursor} = wterl:cursor_open(Session, "statistics:"++Table), +status(#state{passes=Passes}) -> + {_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes), try Stats = fetch_status(Cursor), [{stats, Stats}] @@ -330,21 +327,8 @@ callback(_Ref, _Msg, State) -> %% Internal functions %% =================================================================== -cursor(Session, Table, Type, #state{cursors=undefined}=State) -> - Cursors = ets:new(?MODULE, [{read_concurrency, true}]), - cursor(Session, Table, Type, State#state{cursors=Cursors}); -cursor(Session, Table, Type, #state{cursors=Cursors}=State) -> - case ets:lookup(Cursors, {Type, Table}) of - [{{_Type, Table}, Cursor}] -> - {Cursor, State}; - _ -> - {ok, Cursor} = wterl:cursor_open(Session, Table), - ets:insert(Cursors, {Table, Cursor}), - {Cursor, State} - end. - %% @private -establish_connection(Table, Config) -> +establish_connection(Config) -> %% Get the data root directory case app_helper:get_prop_or_env(data_root, Config, wterl) of undefined -> @@ -373,19 +357,33 @@ establish_connection(Table, Config) -> %"salvage", "verify", "write", "evict", "lsm" ]) ] ++ proplists:get_value(wterl, Config, [])), % sec - %%lager:info("WiredTiger connection:open(~s, ~s)", [DataRoot, wterl:config_to_bin(Opts)]), + lager:info("WiredTiger connection:open(~s, ~s)", [DataRoot, wterl:config_to_bin(Opts)]), case wterl_conn:open(DataRoot, Opts) of {ok, Connection} -> - {ok, #state{table=Table, connection=Connection}}; + {ok, Connection}; {error, Reason2} -> lager:error("Failed to establish a WiredTiger connection, wterl backend unable to start: ~p\n", [Reason2]), {error, Reason2} end end. +establish_passes(Count, Connection, Table) + when is_number(Count), Count > 0 -> + establish_passes(Count, Connection, Table, []). + +establish_passes(Count, Connection, Table, Acc) + when Count > 0 -> + case Count > 1 of + true -> + {ok, Session} = establish_session(Connection, Table), + {ok, Cursor} = wterl:cursor_open(Session, Table), + [{Session, Cursor} | establish_passes(Count - 1, Connection, Table, Acc)]; + false -> + Acc + end. + %% @private -establish_session(#state{table=Table, session=undefined}=State) -> - {ok, Connection} = wterl_conn:get(), +establish_session(Connection, Table) -> case wterl:session_open(Connection, wterl:config_to_bin([{isolation, "snapshot"}])) of {ok, Session} -> SessionOpts = @@ -398,17 +396,15 @@ establish_session(#state{table=Table, session=undefined}=State) -> {lsm_bloom_config, [{leaf_page_max, "10MB"}]} ], case wterl:session_create(Session, Table, wterl:config_to_bin(SessionOpts)) of ok -> - State#state{session=Session}; + {ok, Session}; {error, Reason} -> lager:error("Failed to start wterl backend: ~p\n", [Reason]), - State + {error, Reason} end; {error, Reason} -> lager:error("Failed to open a WiredTiger session: ~p\n", [Reason]), - State - end; -establish_session(State) -> - State. + {error, Reason} + end. %% @private %% Return a function to fold over the buckets on this backend @@ -555,7 +551,7 @@ size_cache(RequestedSize) -> "1GB" end, application:set_env(wterl, cache_size, FinalGuess), - lager:info("Using cache size of ~p for WiredTiger storage backend.", [FinalGuess]), + lager:info("Using cache size of ~p for WiredTiger storage backend.", [FinalGuess]), FinalGuess; Value when is_list(Value) -> Value; diff --git a/src/temp_riak_kv_backend.erl b/src/temp_riak_kv_backend.erl index 5c27461..c41a38d 100644 --- a/src/temp_riak_kv_backend.erl +++ b/src/temp_riak_kv_backend.erl @@ -272,13 +272,16 @@ empty_check({Backend, State}) -> }. setup({BackendMod, Config}) -> - lager:start(), + application:start(lager), application:start(sasl), application:start(os_mon), {ok, S} = BackendMod:start(42, Config), {BackendMod, S}. cleanup({BackendMod, S}) -> - ok = BackendMod:stop(S). + ok = BackendMod:stop(S), + application:stop(lager), + application:stop(sasl), + application:stop(os_mon). -endif. % TEST diff --git a/src/wterl.erl b/src/wterl.erl index dc19fd5..2eb18dd 100644 --- a/src/wterl.erl +++ b/src/wterl.erl @@ -63,6 +63,8 @@ fold_keys/3, fold/3]). +-include("async_nif.hrl"). + -ifdef(TEST). -ifdef(EQC). -include_lib("eqc/include/eqc.hrl"). @@ -103,147 +105,262 @@ init() -> erlang:load_nif(filename:join(PrivDir, atom_to_list(?MODULE)), 0). -spec conn_open(string(), config()) -> {ok, connection()} | {error, term()}. -conn_open(_HomeDir, _Config) -> +conn_open(HomeDir, Config) -> + ?ASYNC_NIF_CALL(fun conn_open_nif/3, [HomeDir, Config]). + +-spec conn_open_nif(reference(), string(), config()) -> {ok, connection()} | {error, term()}. +conn_open_nif(_AsyncRef, _HomeDir, _Config) -> ?nif_stub. -spec conn_close(connection()) -> ok | {error, term()}. -conn_close(_ConnRef) -> +conn_close(ConnRef) -> + ?ASYNC_NIF_CALL(fun conn_close_nif/2, [ConnRef]). + +-spec conn_close_nif(reference(), connection()) -> ok | {error, term()}. +conn_close_nif(_AsyncRef, _ConnRef) -> ?nif_stub. -spec session_open(connection()) -> {ok, session()} | {error, term()}. +-spec session_open(connection(), config()) -> {ok, session()} | {error, term()}. session_open(ConnRef) -> session_open(ConnRef, ?EMPTY_CONFIG). +session_open(ConnRef, Config) -> + ?ASYNC_NIF_CALL(fun session_open_nif/3, [ConnRef, Config]). --spec session_open(connection(), config()) -> {ok, session()} | {error, term()}. -session_open(_ConnRef, _Config) -> +-spec session_open_nif(reference(), connection(), config()) -> {ok, session()} | {error, term()}. +session_open_nif(_AsyncRef, _ConnRef, _Config) -> ?nif_stub. -spec session_close(session()) -> ok | {error, term()}. -session_close(_Ref) -> +session_close(Ref) -> + ?ASYNC_NIF_CALL(fun session_close_nif/2, [Ref]). + +-spec session_close_nif(reference(), session()) -> ok | {error, term()}. +session_close_nif(_AsyncRef, _Ref) -> ?nif_stub. -spec session_create(session(), string()) -> ok | {error, term()}. -spec session_create(session(), string(), config()) -> ok | {error, term()}. session_create(Ref, Name) -> session_create(Ref, Name, ?EMPTY_CONFIG). -session_create(_Ref, _Name, _Config) -> +session_create(Ref, Name, Config) -> + ?ASYNC_NIF_CALL(fun session_create_nif/4, [Ref, Name, Config]). + +-spec session_create_nif(reference(), session(), string(), config()) -> ok | {error, term()}. +session_create_nif(_AsyncNif, _Ref, _Name, _Config) -> ?nif_stub. -spec session_drop(session(), string()) -> ok | {error, term()}. -spec session_drop(session(), string(), config()) -> ok | {error, term()}. session_drop(Ref, Name) -> session_drop(Ref, Name, ?EMPTY_CONFIG). -session_drop(_Ref, _Name, _Config) -> +session_drop(Ref, Name, Config) -> + ?ASYNC_NIF_CALL(fun session_drop_nif/4, [Ref, Name, Config]). + +-spec session_drop_nif(reference(), session(), string(), config()) -> ok | {error, term()}. +session_drop_nif(_AsyncRef, _Ref, _Name, _Config) -> ?nif_stub. -spec session_delete(session(), string(), key()) -> ok | {error, term()}. -session_delete(_Ref, _Table, _Key) -> +session_delete(Ref, Table, Key) -> + ?ASYNC_NIF_CALL(fun session_delete_nif/4, [Ref, Table, Key]). + +-spec session_delete_nif(reference(), session(), string(), key()) -> ok | {error, term()}. +session_delete_nif(_AsyncRef, _Ref, _Table, _Key) -> ?nif_stub. -spec session_get(session(), string(), key()) -> {ok, value()} | not_found | {error, term()}. -session_get(_Ref, _Table, _Key) -> +session_get(Ref, Table, Key) -> + ?ASYNC_NIF_CALL(fun session_get_nif/4, [Ref, Table, Key]). + +-spec session_get_nif(reference(), session(), string(), key()) -> {ok, value()} | not_found | {error, term()}. +session_get_nif(_AsyncRef, _Ref, _Table, _Key) -> ?nif_stub. -spec session_put(session(), string(), key(), value()) -> ok | {error, term()}. -session_put(_Ref, _Table, _Key, _Value) -> +session_put(Ref, Table, Key, Value) -> + ?ASYNC_NIF_CALL(fun session_put_nif/5, [Ref, Table, Key, Value]). + +-spec session_put_nif(reference(), session(), string(), key(), value()) -> ok | {error, term()}. +session_put_nif(_AsyncRef, _Ref, _Table, _Key, _Value) -> ?nif_stub. -spec session_rename(session(), string(), string()) -> ok | {error, term()}. -spec session_rename(session(), string(), string(), config()) -> ok | {error, term()}. session_rename(Ref, OldName, NewName) -> session_rename(Ref, OldName, NewName, ?EMPTY_CONFIG). -session_rename(_Ref, _OldName, _NewName, _Config) -> +session_rename(Ref, OldName, NewName, Config) -> + ?ASYNC_NIF_CALL(fun session_rename_nif/5, [Ref, OldName, NewName, Config]). + +-spec session_rename_nif(reference(), session(), string(), string(), config()) -> ok | {error, term()}. +session_rename_nif(_AsyncRef, _Ref, _OldName, _NewName, _Config) -> ?nif_stub. -spec session_salvage(session(), string()) -> ok | {error, term()}. -spec session_salvage(session(), string(), config()) -> ok | {error, term()}. session_salvage(Ref, Name) -> session_salvage(Ref, Name, ?EMPTY_CONFIG). -session_salvage(_Ref, _Name, _Config) -> +session_salvage(Ref, Name, Config) -> + ?ASYNC_NIF_CALL(fun session_salvage_nif/4, [Ref, Name, Config]). + +-spec session_salvage_nif(reference(), session(), string(), config()) -> ok | {error, term()}. +session_salvage_nif(_AsyncRef, _Ref, _Name, _Config) -> ?nif_stub. -spec session_checkpoint(session()) -> ok | {error, term()}. -spec session_checkpoint(session(), config()) -> ok | {error, term()}. session_checkpoint(_Ref) -> session_checkpoint(_Ref, ?EMPTY_CONFIG). -session_checkpoint(_Ref, _Config) -> +session_checkpoint(Ref, Config) -> + ?ASYNC_NIF_CALL(fun session_checkpoint_nif/3, [Ref, Config]). + +-spec session_checkpoint_nif(reference(), session(), config()) -> ok | {error, term()}. +session_checkpoint_nif(_AsyncRef, _Ref, _Config) -> ?nif_stub. -spec session_truncate(session(), string()) -> ok | {error, term()}. -spec session_truncate(session(), string(), config()) -> ok | {error, term()}. session_truncate(Ref, Name) -> session_truncate(Ref, Name, ?EMPTY_CONFIG). -session_truncate(_Ref, _Name, _Config) -> +session_truncate(Ref, Name, Config) -> + ?ASYNC_NIF_CALL(fun session_truncate_nif/4, [Ref, Name, Config]). + +-spec session_truncate_nif(reference(), session(), string(), config()) -> ok | {error, term()}. +session_truncate_nif(_AsyncRef, _Ref, _Name, _Config) -> ?nif_stub. -spec session_upgrade(session(), string()) -> ok | {error, term()}. -spec session_upgrade(session(), string(), config()) -> ok | {error, term()}. session_upgrade(Ref, Name) -> session_upgrade(Ref, Name, ?EMPTY_CONFIG). -session_upgrade(_Ref, _Name, _Config) -> +session_upgrade(Ref, Name, Config) -> + ?ASYNC_NIF_CALL(fun session_upgrade_nif/4, [Ref, Name, Config]). + +-spec session_upgrade_nif(reference(), session(), string(), config()) -> ok | {error, term()}. +session_upgrade_nif(_AsyncRef, _Ref, _Name, _Config) -> ?nif_stub. -spec session_verify(session(), string()) -> ok | {error, term()}. -spec session_verify(session(), string(), config()) -> ok | {error, term()}. session_verify(Ref, Name) -> session_verify(Ref, Name, ?EMPTY_CONFIG). -session_verify(_Ref, _Name, _Config) -> +session_verify(Ref, Name, Config) -> + ?ASYNC_NIF_CALL(fun session_verify_nif/4, [Ref, Name, Config]). + +-spec session_verify_nif(reference(), session(), string(), config()) -> ok | {error, term()}. +session_verify_nif(_AsyncRef, _Ref, _Name, _Config) -> ?nif_stub. -spec cursor_open(session(), string()) -> {ok, cursor()} | {error, term()}. -cursor_open(_Ref, _Table) -> +cursor_open(Ref, Table) -> + ?ASYNC_NIF_CALL(fun cursor_open_nif/3, [Ref, Table]). + +-spec cursor_open_nif(reference(), session(), string()) -> {ok, cursor()} | {error, term()}. +cursor_open_nif(_AsyncRef, _Ref, _Table) -> ?nif_stub. -spec cursor_close(cursor()) -> ok | {error, term()}. -cursor_close(_Cursor) -> +cursor_close(Cursor) -> + ?ASYNC_NIF_CALL(fun cursor_close_nif/2, [Cursor]). + +-spec cursor_close_nif(reference(), cursor()) -> ok | {error, term()}. +cursor_close_nif(_AsyncRef, _Cursor) -> ?nif_stub. -spec cursor_next(cursor()) -> {ok, key(), value()} | not_found | {error, term()}. -cursor_next(_Cursor) -> +cursor_next(Cursor) -> + ?ASYNC_NIF_CALL(fun cursor_next_nif/2, [Cursor]). + +-spec cursor_next_nif(reference(), cursor()) -> {ok, key(), value()} | not_found | {error, term()}. +cursor_next_nif(_AsyncRef, _Cursor) -> ?nif_stub. -spec cursor_next_key(cursor()) -> {ok, key()} | not_found | {error, term()}. -cursor_next_key(_Cursor) -> +cursor_next_key(Cursor) -> + ?ASYNC_NIF_CALL(fun cursor_next_key_nif/2, [Cursor]). + +-spec cursor_next_key_nif(reference(), cursor()) -> {ok, key()} | not_found | {error, term()}. +cursor_next_key_nif(_AsyncRef, _Cursor) -> ?nif_stub. -spec cursor_next_value(cursor()) -> {ok, value()} | not_found | {error, term()}. -cursor_next_value(_Cursor) -> +cursor_next_value(Cursor) -> + ?ASYNC_NIF_CALL(fun cursor_next_value_nif/2, [Cursor]). + +-spec cursor_next_value_nif(reference(), cursor()) -> {ok, value()} | not_found | {error, term()}. +cursor_next_value_nif(_AsyncRef, _Cursor) -> ?nif_stub. -spec cursor_prev(cursor()) -> {ok, key(), value()} | not_found | {error, term()}. -cursor_prev(_Cursor) -> +cursor_prev(Cursor) -> + ?ASYNC_NIF_CALL(fun cursor_prev_nif/2, [Cursor]). + +-spec cursor_prev_nif(reference(), cursor()) -> {ok, key(), value()} | not_found | {error, term()}. +cursor_prev_nif(_AsyncRef, _Cursor) -> ?nif_stub. -spec cursor_prev_key(cursor()) -> {ok, key()} | not_found | {error, term()}. -cursor_prev_key(_Cursor) -> +cursor_prev_key(Cursor) -> + ?ASYNC_NIF_CALL(fun cursor_prev_key_nif/2, [Cursor]). + +-spec cursor_prev_key_nif(reference(), cursor()) -> {ok, key()} | not_found | {error, term()}. +cursor_prev_key_nif(_AsyncRef, _Cursor) -> ?nif_stub. -spec cursor_prev_value(cursor()) -> {ok, value()} | not_found | {error, term()}. -cursor_prev_value(_Cursor) -> +cursor_prev_value(Cursor) -> + ?ASYNC_NIF_CALL(fun cursor_prev_value_nif/2, [Cursor]). + +-spec cursor_prev_value_nif(reference(), cursor()) -> {ok, value()} | not_found | {error, term()}. +cursor_prev_value_nif(_AsyncRef, _Cursor) -> ?nif_stub. -spec cursor_search(cursor(), key()) -> {ok, value()} | {error, term()}. -cursor_search(_Cursor, _Key) -> +cursor_search(Cursor, Key) -> + ?ASYNC_NIF_CALL(fun cursor_search_nif/3, [Cursor, Key]). + +-spec cursor_search_nif(reference(), cursor(), key()) -> {ok, value()} | {error, term()}. +cursor_search_nif(_AsyncRef, _Cursor, _Key) -> ?nif_stub. -spec cursor_search_near(cursor(), key()) -> {ok, value()} | {error, term()}. -cursor_search_near(_Cursor, _Key) -> +cursor_search_near(Cursor, Key) -> + ?ASYNC_NIF_CALL(fun cursor_search_near_nif/3, [Cursor, Key]). + +-spec cursor_search_near_nif(reference(), cursor(), key()) -> {ok, value()} | {error, term()}. +cursor_search_near_nif(_AsyncRef, _Cursor, _Key) -> ?nif_stub. -spec cursor_reset(cursor()) -> ok | {error, term()}. -cursor_reset(_Cursor) -> +cursor_reset(Cursor) -> + ?ASYNC_NIF_CALL(fun cursor_reset_nif/2, [Cursor]). + +-spec cursor_reset_nif(reference(), cursor()) -> ok | {error, term()}. +cursor_reset_nif(_AsyncRef, _Cursor) -> ?nif_stub. -spec cursor_insert(cursor(), key(), value()) -> ok | {error, term()}. -cursor_insert(_Cursor, _Key, _Value) -> +cursor_insert(Cursor, Key, Value) -> + ?ASYNC_NIF_CALL(fun cursor_insert_nif/4, [Cursor, Key, Value]). + +-spec cursor_insert_nif(reference(), cursor(), key(), value()) -> ok | {error, term()}. +cursor_insert_nif(_AsyncRef, _Cursor, _Key, _Value) -> ?nif_stub. -spec cursor_update(cursor(), key(), value()) -> ok | {error, term()}. -cursor_update(_Cursor, _Key, _Value) -> +cursor_update(Cursor, Key, Value) -> + ?ASYNC_NIF_CALL(fun cursor_update_nif/4, [Cursor, Key, Value]). + +-spec cursor_update_nif(reference(), cursor(), key(), value()) -> ok | {error, term()}. +cursor_update_nif(_AsyncRef, _Cursor, _Key, _Value) -> ?nif_stub. -spec cursor_remove(cursor(), key()) -> ok | {error, term()}. -cursor_remove(_Cursor, _Key) -> +cursor_remove(Cursor, Key) -> + ?ASYNC_NIF_CALL(fun cursor_remove_nif/3, [Cursor, Key]). + +-spec cursor_remove_nif(reference(), cursor(), key()) -> ok | {error, term()}. +cursor_remove_nif(_AsyncRef, _Cursor, _Key) -> ?nif_stub. -type fold_keys_fun() :: fun((Key::binary(), any()) -> any()). @@ -371,7 +488,9 @@ config_to_bin([{Key, Value} | Rest], Acc) -> -define(TEST_DATA_DIR, "test/wterl.basic"). open_test_conn(DataDir) -> - ?assertCmd("rm -rf "++DataDir), + {ok, CWD} = file:get_cwd(), + ?assertMatch(true, lists:suffix("wterl/.eunit", CWD)), + ?cmd("rm -rf "++DataDir), ?assertMatch(ok, filelib:ensure_dir(filename:join(DataDir, "x"))), OpenConfig = config_to_bin([{create,true},{cache_size,"100MB"}]), {ok, ConnRef} = conn_open(DataDir, OpenConfig), @@ -637,9 +756,11 @@ prop_put_delete() -> ?LET({Keys, Values}, {keys(), values()}, ?FORALL(Ops, eqc_gen:non_empty(list(ops(Keys, Values))), begin - DataDir = "/tmp/wterl.putdelete.qc", + DataDir = "test/wterl.putdelete.qc", Table = "table:eqc", - ?cmd("rm -rf "++DataDir), + {ok, CWD} = file:get_cwd(), + ?assertMatch(true, lists:suffix("wterl/.eunit", CWD)), + ?cmd("rm -rf "++DataDir), ok = filelib:ensure_dir(filename:join(DataDir, "x")), Cfg = wterl:config_to_bin([{create,true}]), {ok, Conn} = wterl:conn_open(DataDir, Cfg),