Queue and execute work from scheduler threads on other threads to prevent schedulers from sleeping.

This commit is contained in:
Gregory Burd 2013-03-24 21:00:48 -04:00
parent 85b84a5343
commit 9f4e08ca6e
7 changed files with 2265 additions and 631 deletions

282
c_src/async_nif.h Normal file
View file

@ -0,0 +1,282 @@
/*
*
* async_nif: An async thread-pool layer for Erlang's NIF API
*
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
* Author: Gregory Burd <greg@basho.com> <greg@burd.me>
*
* This file is provided to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#ifndef __ASYNC_NIF_H__
#define __ASYNC_NIF_H__
#if defined(__cplusplus)
extern "C" {
#endif
/* Redefine this in your NIF implementation before including this file to
change the thread pool size. The maximum number of threads might be
bounded on your OS. For instance, to allow 1,000,000 threads on a Linux
system you must do the following before launching the process.
echo 1000000 > /proc/sys/kernel/threads-max
and for all UNIX systems there will be ulimit maximums. */
#ifndef ASYNC_NIF_MAX_WORKERS
#define ASYNC_NIF_MAX_WORKERS 16
#endif
#include "queue.h"
struct async_nif_req_entry {
ERL_NIF_TERM ref, *argv;
ErlNifEnv *env;
ErlNifPid pid;
void *args;
void *priv_data;
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, void *, ErlNifPid*, void *);
void (*fn_post)(void *);
STAILQ_ENTRY(async_nif_req_entry) entries;
};
STAILQ_HEAD(reqs, async_nif_req_entry) async_nif_reqs = STAILQ_HEAD_INITIALIZER(async_nif_reqs);
struct async_nif_worker_entry {
ErlNifTid tid;
LIST_ENTRY(async_nif_worker_entry) entries;
};
LIST_HEAD(idle_workers, async_nif_worker_entry) async_nif_idle_workers = LIST_HEAD_INITIALIZER(async_nif_worker);
static volatile unsigned int async_nif_req_count = 0;
static volatile unsigned int async_nif_shutdown = 0;
static ErlNifMutex *async_nif_req_mutex = NULL;
static ErlNifMutex *async_nif_worker_mutex = NULL;
static ErlNifCond *async_nif_cnd = NULL;
static struct async_nif_worker_entry async_nif_worker_entries[ASYNC_NIF_MAX_WORKERS];
#define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \
struct decl ## _args frame; \
static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, void *priv_data, ErlNifPid *pid, struct decl ## _args *args) work_block \
static void fn_post_ ## decl (struct decl ## _args *args) { \
do post_block while(0); \
} \
static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \
struct decl ## _args on_stack_args; \
struct decl ## _args *args = &on_stack_args; \
struct decl ## _args *copy_of_args; \
struct async_nif_req_entry *req = NULL; \
ErlNifEnv *new_env = NULL; \
/* argv[0] is a ref used for selective recv */ \
const ERL_NIF_TERM *argv = argv_in + 1; \
argc--; \
if (async_nif_shutdown) \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \
if (!(new_env = enif_alloc_env())) { \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \
} \
do pre_block while(0); \
req = (struct async_nif_req_entry*)enif_alloc(sizeof(struct async_nif_req_entry)); \
if (!req) { \
fn_post_ ## decl (args); \
enif_free_env(new_env); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \
} \
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \
if (!copy_of_args) { \
fn_post_ ## decl (args); \
enif_free_env(new_env); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \
} \
memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \
req->env = new_env; \
req->ref = enif_make_copy(new_env, argv_in[0]); \
enif_self(env, &req->pid); \
req->args = (void*)copy_of_args; \
req->priv_data = enif_priv_data(env); \
req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, void*, ErlNifPid*, void *))fn_work_ ## decl ; \
req->fn_post = (void (*)(void *))fn_post_ ## decl; \
async_nif_enqueue_req(req); \
return enif_make_tuple2(env, enif_make_atom(env, "ok"), \
enif_make_tuple2(env, enif_make_atom(env, "enqueued"), \
enif_make_int(env, async_nif_req_count))); \
}
#define ASYNC_NIF_LOAD() if (async_nif_init() != 0) return -1;
#define ASYNC_NIF_UNLOAD() async_nif_unload();
#define ASYNC_NIF_UPGRADE() async_nif_unload();
#define ASYNC_NIF_RETURN_BADARG() return enif_make_badarg(env);
#define ASYNC_NIF_WORK_ENV new_env
#ifndef PULSE_FORCE_USING_PULSE_SEND_HERE
#define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg))
#else
#define ASYNC_NIF_REPLY(msg) PULSE_SEND(NULL, pid, env, enif_make_tuple2(env, ref, msg))
#endif
static void async_nif_enqueue_req(struct async_nif_req_entry *r)
{
/* Add the request to the work queue. */
enif_mutex_lock(async_nif_req_mutex);
STAILQ_INSERT_TAIL(&async_nif_reqs, r, entries);
async_nif_req_count++;
enif_mutex_unlock(async_nif_req_mutex);
enif_cond_broadcast(async_nif_cnd);
}
static void *async_nif_worker_fn(void *arg)
{
struct async_nif_worker_entry *worker = (struct async_nif_worker_entry *)arg;
struct async_nif_req_entry *req = NULL;
/*
* Workers are active while there is work on the queue to do and
* only in the idle list when they are waiting on new work.
*/
for(;;) {
/* Examine the request queue, are there things to be done? */
enif_mutex_lock(async_nif_req_mutex);
enif_mutex_lock(async_nif_worker_mutex);
LIST_INSERT_HEAD(&async_nif_idle_workers, worker, entries);
enif_mutex_unlock(async_nif_worker_mutex);
check_again_for_work:
if (async_nif_shutdown) { enif_mutex_unlock(async_nif_req_mutex); break; }
if ((req = STAILQ_FIRST(&async_nif_reqs)) == NULL) {
/* Queue is empty, join the list of idle workers and wait for work */
enif_cond_wait(async_nif_cnd, async_nif_req_mutex);
goto check_again_for_work;
} else {
/* `req` is our work request and we hold the lock. */
enif_cond_broadcast(async_nif_cnd);
/* Take the request off the queue. */
STAILQ_REMOVE(&async_nif_reqs, req, async_nif_req_entry, entries); async_nif_req_count--;
/* Now we need to remove this thread from the list of idle threads. */
enif_mutex_lock(async_nif_worker_mutex);
LIST_REMOVE(worker, entries);
/* Release the locks in reverse order that we acquired them,
so as not to self-deadlock. */
enif_mutex_unlock(async_nif_worker_mutex);
enif_mutex_unlock(async_nif_req_mutex);
/* Finally, let's do the work! :) */
req->fn_work(req->env, req->ref, req->priv_data, &req->pid, req->args);
req->fn_post(req->args);
enif_free(req->args);
enif_free_env(req->env);
enif_free(req);
}
}
enif_thread_exit(0);
return 0;
}
static void async_nif_unload(void)
{
unsigned int i;
/* Signal the worker threads, stop what you're doing and exit. */
enif_mutex_lock(async_nif_req_mutex);
async_nif_shutdown = 1;
enif_cond_broadcast(async_nif_cnd);
enif_mutex_unlock(async_nif_req_mutex);
/* Join for the now exiting worker threads. */
for (i = 0; i < ASYNC_NIF_MAX_WORKERS; ++i) {
void *exit_value = 0; /* Ignore this. */
enif_thread_join(async_nif_worker_entries[i].tid, &exit_value);
}
/* We won't get here until all threads have exited.
Patch things up, and carry on. */
enif_mutex_lock(async_nif_req_mutex);
/* Worker threads are stopped, now toss anything left in the queue. */
struct async_nif_req_entry *req = NULL;
STAILQ_FOREACH(req, &async_nif_reqs, entries) {
STAILQ_REMOVE(&async_nif_reqs, STAILQ_LAST(&async_nif_reqs, async_nif_req_entry, entries),
async_nif_req_entry, entries);
#ifdef PULSE
PULSE_SEND(NULL, &req->pid, req->env,
enif_make_tuple2(req->env, enif_make_atom(req->env, "error"),
enif_make_atom(req->env, "shutdown")));
#else
enif_send(NULL, &req->pid, req->env,
enif_make_tuple2(req->env, enif_make_atom(req->env, "error"),
enif_make_atom(req->env, "shutdown")));
#endif
req->fn_post(req->args);
enif_free(req->args);
enif_free(req);
async_nif_req_count--;
}
enif_mutex_unlock(async_nif_req_mutex);
memset(async_nif_worker_entries, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS, 0);
enif_cond_destroy(async_nif_cnd); async_nif_cnd = NULL;
enif_mutex_destroy(async_nif_req_mutex); async_nif_req_mutex = NULL;
enif_mutex_destroy(async_nif_worker_mutex); async_nif_worker_mutex = NULL;
}
static int async_nif_init(void)
{
int i;
/* Don't init more than once. */
if (async_nif_req_mutex) return 0;
async_nif_req_mutex = enif_mutex_create(NULL);
async_nif_worker_mutex = enif_mutex_create(NULL);
async_nif_cnd = enif_cond_create(NULL);
/* Setup the requests management. */
async_nif_req_count = 0;
/* Setup the thread pool management. */
enif_mutex_lock(async_nif_worker_mutex);
memset(async_nif_worker_entries, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS, 0);
for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) {
if (enif_thread_create(NULL, &async_nif_worker_entries[i].tid,
&async_nif_worker_fn, (void*)&async_nif_worker_entries[i], NULL) != 0) {
async_nif_shutdown = 1;
enif_cond_broadcast(async_nif_cnd);
enif_mutex_unlock(async_nif_worker_mutex);
while(i-- > 0) {
void *exit_value = 0; /* Ignore this. */
enif_thread_join(async_nif_worker_entries[i].tid, &exit_value);
}
memset(async_nif_worker_entries, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS, 0);
enif_cond_destroy(async_nif_cnd); async_nif_cnd = NULL;
enif_mutex_destroy(async_nif_req_mutex); async_nif_req_mutex = NULL;
enif_mutex_destroy(async_nif_worker_mutex); async_nif_worker_mutex = NULL;
return -1;
}
}
enif_mutex_unlock(async_nif_worker_mutex);
return 0;
}
#if defined(__cplusplus)
}
#endif
#endif // __ASYNC_NIF_H__

648
c_src/queue.h Normal file
View file

@ -0,0 +1,648 @@
/*-
* Copyright (c) 1991, 1993
* The Regents of the University of California. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 4. Neither the name of the University nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* @(#)queue.h 8.5 (Berkeley) 8/20/94
* $FreeBSD: src/sys/sys/queue.h,v 1.75.2.3 2012/11/17 11:37:26 svnexp Exp $
*/
#ifndef _SYS_QUEUE_H_
#define _SYS_QUEUE_H_
#ifndef __offsetof
#define __offsetof(st, m) \
((size_t) ( (char *)&((st *)0)->m - (char *)0 ))
#endif
#ifndef __containerof
#define __containerof(ptr, type, member) ({ \
const typeof( ((type *)0)->member ) *__mptr = (ptr); \
(type *)( (char *)__mptr - __offsetof(type,member) );})
#endif
/*
* This file defines four types of data structures: singly-linked lists,
* singly-linked tail queues, lists and tail queues.
*
* A singly-linked list is headed by a single forward pointer. The elements
* are singly linked for minimum space and pointer manipulation overhead at
* the expense of O(n) removal for arbitrary elements. New elements can be
* added to the list after an existing element or at the head of the list.
* Elements being removed from the head of the list should use the explicit
* macro for this purpose for optimum efficiency. A singly-linked list may
* only be traversed in the forward direction. Singly-linked lists are ideal
* for applications with large datasets and few or no removals or for
* implementing a LIFO queue.
*
* A singly-linked tail queue is headed by a pair of pointers, one to the
* head of the list and the other to the tail of the list. The elements are
* singly linked for minimum space and pointer manipulation overhead at the
* expense of O(n) removal for arbitrary elements. New elements can be added
* to the list after an existing element, at the head of the list, or at the
* end of the list. Elements being removed from the head of the tail queue
* should use the explicit macro for this purpose for optimum efficiency.
* A singly-linked tail queue may only be traversed in the forward direction.
* Singly-linked tail queues are ideal for applications with large datasets
* and few or no removals or for implementing a FIFO queue.
*
* A list is headed by a single forward pointer (or an array of forward
* pointers for a hash table header). The elements are doubly linked
* so that an arbitrary element can be removed without a need to
* traverse the list. New elements can be added to the list before
* or after an existing element or at the head of the list. A list
* may be traversed in either direction.
*
* A tail queue is headed by a pair of pointers, one to the head of the
* list and the other to the tail of the list. The elements are doubly
* linked so that an arbitrary element can be removed without a need to
* traverse the list. New elements can be added to the list before or
* after an existing element, at the head of the list, or at the end of
* the list. A tail queue may be traversed in either direction.
*
* For details on the use of these macros, see the queue(3) manual page.
*
*
* SLIST LIST STAILQ TAILQ
* _HEAD + + + +
* _HEAD_INITIALIZER + + + +
* _ENTRY + + + +
* _INIT + + + +
* _EMPTY + + + +
* _FIRST + + + +
* _NEXT + + + +
* _PREV - + - +
* _LAST - - + +
* _FOREACH + + + +
* _FOREACH_SAFE + + + +
* _FOREACH_REVERSE - - - +
* _FOREACH_REVERSE_SAFE - - - +
* _INSERT_HEAD + + + +
* _INSERT_BEFORE - + - +
* _INSERT_AFTER + + + +
* _INSERT_TAIL - - + +
* _CONCAT - - + +
* _REMOVE_AFTER + - + -
* _REMOVE_HEAD + - + -
* _REMOVE + + + +
* _SWAP + + + +
*
*/
#ifdef QUEUE_MACRO_DEBUG
/* Store the last 2 places the queue element or head was altered */
struct qm_trace {
char * lastfile;
int lastline;
char * prevfile;
int prevline;
};
#define TRACEBUF struct qm_trace trace;
#define TRASHIT(x) do {(x) = (void *)-1;} while (0)
#define QMD_SAVELINK(name, link) void **name = (void *)&(link)
#define QMD_TRACE_HEAD(head) do { \
(head)->trace.prevline = (head)->trace.lastline; \
(head)->trace.prevfile = (head)->trace.lastfile; \
(head)->trace.lastline = __LINE__; \
(head)->trace.lastfile = __FILE__; \
} while (0)
#define QMD_TRACE_ELEM(elem) do { \
(elem)->trace.prevline = (elem)->trace.lastline; \
(elem)->trace.prevfile = (elem)->trace.lastfile; \
(elem)->trace.lastline = __LINE__; \
(elem)->trace.lastfile = __FILE__; \
} while (0)
#else
#define QMD_TRACE_ELEM(elem)
#define QMD_TRACE_HEAD(head)
#define QMD_SAVELINK(name, link)
#define TRACEBUF
#define TRASHIT(x)
#endif /* QUEUE_MACRO_DEBUG */
/*
* Singly-linked List declarations.
*/
#define SLIST_HEAD(name, type) \
struct name { \
struct type *slh_first; /* first element */ \
}
#define SLIST_HEAD_INITIALIZER(head) \
{ NULL }
#define SLIST_ENTRY(type) \
struct { \
struct type *sle_next; /* next element */ \
}
/*
* Singly-linked List functions.
*/
#define SLIST_EMPTY(head) ((head)->slh_first == NULL)
#define SLIST_FIRST(head) ((head)->slh_first)
#define SLIST_FOREACH(var, head, field) \
for ((var) = SLIST_FIRST((head)); \
(var); \
(var) = SLIST_NEXT((var), field))
#define SLIST_FOREACH_SAFE(var, head, field, tvar) \
for ((var) = SLIST_FIRST((head)); \
(var) && ((tvar) = SLIST_NEXT((var), field), 1); \
(var) = (tvar))
#define SLIST_FOREACH_PREVPTR(var, varp, head, field) \
for ((varp) = &SLIST_FIRST((head)); \
((var) = *(varp)) != NULL; \
(varp) = &SLIST_NEXT((var), field))
#define SLIST_INIT(head) do { \
SLIST_FIRST((head)) = NULL; \
} while (0)
#define SLIST_INSERT_AFTER(slistelm, elm, field) do { \
SLIST_NEXT((elm), field) = SLIST_NEXT((slistelm), field); \
SLIST_NEXT((slistelm), field) = (elm); \
} while (0)
#define SLIST_INSERT_HEAD(head, elm, field) do { \
SLIST_NEXT((elm), field) = SLIST_FIRST((head)); \
SLIST_FIRST((head)) = (elm); \
} while (0)
#define SLIST_NEXT(elm, field) ((elm)->field.sle_next)
#define SLIST_REMOVE(head, elm, type, field) do { \
QMD_SAVELINK(oldnext, (elm)->field.sle_next); \
if (SLIST_FIRST((head)) == (elm)) { \
SLIST_REMOVE_HEAD((head), field); \
} \
else { \
struct type *curelm = SLIST_FIRST((head)); \
while (SLIST_NEXT(curelm, field) != (elm)) \
curelm = SLIST_NEXT(curelm, field); \
SLIST_REMOVE_AFTER(curelm, field); \
} \
TRASHIT(*oldnext); \
} while (0)
#define SLIST_REMOVE_AFTER(elm, field) do { \
SLIST_NEXT(elm, field) = \
SLIST_NEXT(SLIST_NEXT(elm, field), field); \
} while (0)
#define SLIST_REMOVE_HEAD(head, field) do { \
SLIST_FIRST((head)) = SLIST_NEXT(SLIST_FIRST((head)), field); \
} while (0)
#define SLIST_SWAP(head1, head2, type) do { \
struct type *swap_first = SLIST_FIRST(head1); \
SLIST_FIRST(head1) = SLIST_FIRST(head2); \
SLIST_FIRST(head2) = swap_first; \
} while (0)
/*
* Singly-linked Tail queue declarations.
*/
#define STAILQ_HEAD(name, type) \
struct name { \
struct type *stqh_first;/* first element */ \
struct type **stqh_last;/* addr of last next element */ \
}
#define STAILQ_HEAD_INITIALIZER(head) \
{ NULL, &(head).stqh_first }
#define STAILQ_ENTRY(type) \
struct { \
struct type *stqe_next; /* next element */ \
}
/*
* Singly-linked Tail queue functions.
*/
#define STAILQ_CONCAT(head1, head2) do { \
if (!STAILQ_EMPTY((head2))) { \
*(head1)->stqh_last = (head2)->stqh_first; \
(head1)->stqh_last = (head2)->stqh_last; \
STAILQ_INIT((head2)); \
} \
} while (0)
#define STAILQ_EMPTY(head) ((head)->stqh_first == NULL)
#define STAILQ_FIRST(head) ((head)->stqh_first)
#define STAILQ_FOREACH(var, head, field) \
for((var) = STAILQ_FIRST((head)); \
(var); \
(var) = STAILQ_NEXT((var), field))
#define STAILQ_FOREACH_SAFE(var, head, field, tvar) \
for ((var) = STAILQ_FIRST((head)); \
(var) && ((tvar) = STAILQ_NEXT((var), field), 1); \
(var) = (tvar))
#define STAILQ_INIT(head) do { \
STAILQ_FIRST((head)) = NULL; \
(head)->stqh_last = &STAILQ_FIRST((head)); \
} while (0)
#define STAILQ_INSERT_AFTER(head, tqelm, elm, field) do { \
if ((STAILQ_NEXT((elm), field) = STAILQ_NEXT((tqelm), field)) == NULL)\
(head)->stqh_last = &STAILQ_NEXT((elm), field); \
STAILQ_NEXT((tqelm), field) = (elm); \
} while (0)
#define STAILQ_INSERT_HEAD(head, elm, field) do { \
if ((STAILQ_NEXT((elm), field) = STAILQ_FIRST((head))) == NULL) \
(head)->stqh_last = &STAILQ_NEXT((elm), field); \
STAILQ_FIRST((head)) = (elm); \
} while (0)
#define STAILQ_INSERT_TAIL(head, elm, field) do { \
STAILQ_NEXT((elm), field) = NULL; \
*(head)->stqh_last = (elm); \
(head)->stqh_last = &STAILQ_NEXT((elm), field); \
} while (0)
#define STAILQ_LAST(head, type, field) \
(STAILQ_EMPTY((head)) ? NULL : \
__containerof((head)->stqh_last, struct type, field.stqe_next))
#define STAILQ_NEXT(elm, field) ((elm)->field.stqe_next)
#define STAILQ_REMOVE(head, elm, type, field) do { \
QMD_SAVELINK(oldnext, (elm)->field.stqe_next); \
if (STAILQ_FIRST((head)) == (elm)) { \
STAILQ_REMOVE_HEAD((head), field); \
} \
else { \
struct type *curelm = STAILQ_FIRST((head)); \
while (STAILQ_NEXT(curelm, field) != (elm)) \
curelm = STAILQ_NEXT(curelm, field); \
STAILQ_REMOVE_AFTER(head, curelm, field); \
} \
TRASHIT(*oldnext); \
} while (0)
#define STAILQ_REMOVE_AFTER(head, elm, field) do { \
if ((STAILQ_NEXT(elm, field) = \
STAILQ_NEXT(STAILQ_NEXT(elm, field), field)) == NULL) \
(head)->stqh_last = &STAILQ_NEXT((elm), field); \
} while (0)
#define STAILQ_REMOVE_HEAD(head, field) do { \
if ((STAILQ_FIRST((head)) = \
STAILQ_NEXT(STAILQ_FIRST((head)), field)) == NULL) \
(head)->stqh_last = &STAILQ_FIRST((head)); \
} while (0)
#define STAILQ_SWAP(head1, head2, type) do { \
struct type *swap_first = STAILQ_FIRST(head1); \
struct type **swap_last = (head1)->stqh_last; \
STAILQ_FIRST(head1) = STAILQ_FIRST(head2); \
(head1)->stqh_last = (head2)->stqh_last; \
STAILQ_FIRST(head2) = swap_first; \
(head2)->stqh_last = swap_last; \
if (STAILQ_EMPTY(head1)) \
(head1)->stqh_last = &STAILQ_FIRST(head1); \
if (STAILQ_EMPTY(head2)) \
(head2)->stqh_last = &STAILQ_FIRST(head2); \
} while (0)
/*
* List declarations.
*/
#define LIST_HEAD(name, type) \
struct name { \
struct type *lh_first; /* first element */ \
}
#define LIST_HEAD_INITIALIZER(head) \
{ NULL }
#define LIST_ENTRY(type) \
struct { \
struct type *le_next; /* next element */ \
struct type **le_prev; /* address of previous next element */ \
}
/*
* List functions.
*/
#if (defined(_KERNEL) && defined(INVARIANTS))
#define QMD_LIST_CHECK_HEAD(head, field) do { \
if (LIST_FIRST((head)) != NULL && \
LIST_FIRST((head))->field.le_prev != \
&LIST_FIRST((head))) \
panic("Bad list head %p first->prev != head", (head)); \
} while (0)
#define QMD_LIST_CHECK_NEXT(elm, field) do { \
if (LIST_NEXT((elm), field) != NULL && \
LIST_NEXT((elm), field)->field.le_prev != \
&((elm)->field.le_next)) \
panic("Bad link elm %p next->prev != elm", (elm)); \
} while (0)
#define QMD_LIST_CHECK_PREV(elm, field) do { \
if (*(elm)->field.le_prev != (elm)) \
panic("Bad link elm %p prev->next != elm", (elm)); \
} while (0)
#else
#define QMD_LIST_CHECK_HEAD(head, field)
#define QMD_LIST_CHECK_NEXT(elm, field)
#define QMD_LIST_CHECK_PREV(elm, field)
#endif /* (_KERNEL && INVARIANTS) */
#define LIST_EMPTY(head) ((head)->lh_first == NULL)
#define LIST_FIRST(head) ((head)->lh_first)
#define LIST_FOREACH(var, head, field) \
for ((var) = LIST_FIRST((head)); \
(var); \
(var) = LIST_NEXT((var), field))
#define LIST_FOREACH_SAFE(var, head, field, tvar) \
for ((var) = LIST_FIRST((head)); \
(var) && ((tvar) = LIST_NEXT((var), field), 1); \
(var) = (tvar))
#define LIST_INIT(head) do { \
LIST_FIRST((head)) = NULL; \
} while (0)
#define LIST_INSERT_AFTER(listelm, elm, field) do { \
QMD_LIST_CHECK_NEXT(listelm, field); \
if ((LIST_NEXT((elm), field) = LIST_NEXT((listelm), field)) != NULL)\
LIST_NEXT((listelm), field)->field.le_prev = \
&LIST_NEXT((elm), field); \
LIST_NEXT((listelm), field) = (elm); \
(elm)->field.le_prev = &LIST_NEXT((listelm), field); \
} while (0)
#define LIST_INSERT_BEFORE(listelm, elm, field) do { \
QMD_LIST_CHECK_PREV(listelm, field); \
(elm)->field.le_prev = (listelm)->field.le_prev; \
LIST_NEXT((elm), field) = (listelm); \
*(listelm)->field.le_prev = (elm); \
(listelm)->field.le_prev = &LIST_NEXT((elm), field); \
} while (0)
#define LIST_INSERT_HEAD(head, elm, field) do { \
QMD_LIST_CHECK_HEAD((head), field); \
if ((LIST_NEXT((elm), field) = LIST_FIRST((head))) != NULL) \
LIST_FIRST((head))->field.le_prev = &LIST_NEXT((elm), field);\
LIST_FIRST((head)) = (elm); \
(elm)->field.le_prev = &LIST_FIRST((head)); \
} while (0)
#define LIST_NEXT(elm, field) ((elm)->field.le_next)
#define LIST_PREV(elm, head, type, field) \
((elm)->field.le_prev == &LIST_FIRST((head)) ? NULL : \
__containerof((elm)->field.le_prev, struct type, field.le_next))
#define LIST_REMOVE(elm, field) do { \
QMD_SAVELINK(oldnext, (elm)->field.le_next); \
QMD_SAVELINK(oldprev, (elm)->field.le_prev); \
QMD_LIST_CHECK_NEXT(elm, field); \
QMD_LIST_CHECK_PREV(elm, field); \
if (LIST_NEXT((elm), field) != NULL) \
LIST_NEXT((elm), field)->field.le_prev = \
(elm)->field.le_prev; \
*(elm)->field.le_prev = LIST_NEXT((elm), field); \
TRASHIT(*oldnext); \
TRASHIT(*oldprev); \
} while (0)
#define LIST_SWAP(head1, head2, type, field) do { \
struct type *swap_tmp = LIST_FIRST((head1)); \
LIST_FIRST((head1)) = LIST_FIRST((head2)); \
LIST_FIRST((head2)) = swap_tmp; \
if ((swap_tmp = LIST_FIRST((head1))) != NULL) \
swap_tmp->field.le_prev = &LIST_FIRST((head1)); \
if ((swap_tmp = LIST_FIRST((head2))) != NULL) \
swap_tmp->field.le_prev = &LIST_FIRST((head2)); \
} while (0)
/*
* Tail queue declarations.
*/
#define TAILQ_HEAD(name, type) \
struct name { \
struct type *tqh_first; /* first element */ \
struct type **tqh_last; /* addr of last next element */ \
TRACEBUF \
}
#define TAILQ_HEAD_INITIALIZER(head) \
{ NULL, &(head).tqh_first }
#define TAILQ_ENTRY(type) \
struct { \
struct type *tqe_next; /* next element */ \
struct type **tqe_prev; /* address of previous next element */ \
TRACEBUF \
}
/*
* Tail queue functions.
*/
#if (defined(_KERNEL) && defined(INVARIANTS))
#define QMD_TAILQ_CHECK_HEAD(head, field) do { \
if (!TAILQ_EMPTY(head) && \
TAILQ_FIRST((head))->field.tqe_prev != \
&TAILQ_FIRST((head))) \
panic("Bad tailq head %p first->prev != head", (head)); \
} while (0)
#define QMD_TAILQ_CHECK_TAIL(head, field) do { \
if (*(head)->tqh_last != NULL) \
panic("Bad tailq NEXT(%p->tqh_last) != NULL", (head)); \
} while (0)
#define QMD_TAILQ_CHECK_NEXT(elm, field) do { \
if (TAILQ_NEXT((elm), field) != NULL && \
TAILQ_NEXT((elm), field)->field.tqe_prev != \
&((elm)->field.tqe_next)) \
panic("Bad link elm %p next->prev != elm", (elm)); \
} while (0)
#define QMD_TAILQ_CHECK_PREV(elm, field) do { \
if (*(elm)->field.tqe_prev != (elm)) \
panic("Bad link elm %p prev->next != elm", (elm)); \
} while (0)
#else
#define QMD_TAILQ_CHECK_HEAD(head, field)
#define QMD_TAILQ_CHECK_TAIL(head, headname)
#define QMD_TAILQ_CHECK_NEXT(elm, field)
#define QMD_TAILQ_CHECK_PREV(elm, field)
#endif /* (_KERNEL && INVARIANTS) */
#define TAILQ_CONCAT(head1, head2, field) do { \
if (!TAILQ_EMPTY(head2)) { \
*(head1)->tqh_last = (head2)->tqh_first; \
(head2)->tqh_first->field.tqe_prev = (head1)->tqh_last; \
(head1)->tqh_last = (head2)->tqh_last; \
TAILQ_INIT((head2)); \
QMD_TRACE_HEAD(head1); \
QMD_TRACE_HEAD(head2); \
} \
} while (0)
#define TAILQ_EMPTY(head) ((head)->tqh_first == NULL)
#define TAILQ_FIRST(head) ((head)->tqh_first)
#define TAILQ_FOREACH(var, head, field) \
for ((var) = TAILQ_FIRST((head)); \
(var); \
(var) = TAILQ_NEXT((var), field))
#define TAILQ_FOREACH_SAFE(var, head, field, tvar) \
for ((var) = TAILQ_FIRST((head)); \
(var) && ((tvar) = TAILQ_NEXT((var), field), 1); \
(var) = (tvar))
#define TAILQ_FOREACH_REVERSE(var, head, headname, field) \
for ((var) = TAILQ_LAST((head), headname); \
(var); \
(var) = TAILQ_PREV((var), headname, field))
#define TAILQ_FOREACH_REVERSE_SAFE(var, head, headname, field, tvar) \
for ((var) = TAILQ_LAST((head), headname); \
(var) && ((tvar) = TAILQ_PREV((var), headname, field), 1); \
(var) = (tvar))
#define TAILQ_INIT(head) do { \
TAILQ_FIRST((head)) = NULL; \
(head)->tqh_last = &TAILQ_FIRST((head)); \
QMD_TRACE_HEAD(head); \
} while (0)
#define TAILQ_INSERT_AFTER(head, listelm, elm, field) do { \
QMD_TAILQ_CHECK_NEXT(listelm, field); \
if ((TAILQ_NEXT((elm), field) = TAILQ_NEXT((listelm), field)) != NULL)\
TAILQ_NEXT((elm), field)->field.tqe_prev = \
&TAILQ_NEXT((elm), field); \
else { \
(head)->tqh_last = &TAILQ_NEXT((elm), field); \
QMD_TRACE_HEAD(head); \
} \
TAILQ_NEXT((listelm), field) = (elm); \
(elm)->field.tqe_prev = &TAILQ_NEXT((listelm), field); \
QMD_TRACE_ELEM(&(elm)->field); \
QMD_TRACE_ELEM(&listelm->field); \
} while (0)
#define TAILQ_INSERT_BEFORE(listelm, elm, field) do { \
QMD_TAILQ_CHECK_PREV(listelm, field); \
(elm)->field.tqe_prev = (listelm)->field.tqe_prev; \
TAILQ_NEXT((elm), field) = (listelm); \
*(listelm)->field.tqe_prev = (elm); \
(listelm)->field.tqe_prev = &TAILQ_NEXT((elm), field); \
QMD_TRACE_ELEM(&(elm)->field); \
QMD_TRACE_ELEM(&listelm->field); \
} while (0)
#define TAILQ_INSERT_HEAD(head, elm, field) do { \
QMD_TAILQ_CHECK_HEAD(head, field); \
if ((TAILQ_NEXT((elm), field) = TAILQ_FIRST((head))) != NULL) \
TAILQ_FIRST((head))->field.tqe_prev = \
&TAILQ_NEXT((elm), field); \
else \
(head)->tqh_last = &TAILQ_NEXT((elm), field); \
TAILQ_FIRST((head)) = (elm); \
(elm)->field.tqe_prev = &TAILQ_FIRST((head)); \
QMD_TRACE_HEAD(head); \
QMD_TRACE_ELEM(&(elm)->field); \
} while (0)
#define TAILQ_INSERT_TAIL(head, elm, field) do { \
QMD_TAILQ_CHECK_TAIL(head, field); \
TAILQ_NEXT((elm), field) = NULL; \
(elm)->field.tqe_prev = (head)->tqh_last; \
*(head)->tqh_last = (elm); \
(head)->tqh_last = &TAILQ_NEXT((elm), field); \
QMD_TRACE_HEAD(head); \
QMD_TRACE_ELEM(&(elm)->field); \
} while (0)
#define TAILQ_LAST(head, headname) \
(*(((struct headname *)((head)->tqh_last))->tqh_last))
#define TAILQ_NEXT(elm, field) ((elm)->field.tqe_next)
#define TAILQ_PREV(elm, headname, field) \
(*(((struct headname *)((elm)->field.tqe_prev))->tqh_last))
#define TAILQ_REMOVE(head, elm, field) do { \
QMD_SAVELINK(oldnext, (elm)->field.tqe_next); \
QMD_SAVELINK(oldprev, (elm)->field.tqe_prev); \
QMD_TAILQ_CHECK_NEXT(elm, field); \
QMD_TAILQ_CHECK_PREV(elm, field); \
if ((TAILQ_NEXT((elm), field)) != NULL) \
TAILQ_NEXT((elm), field)->field.tqe_prev = \
(elm)->field.tqe_prev; \
else { \
(head)->tqh_last = (elm)->field.tqe_prev; \
QMD_TRACE_HEAD(head); \
} \
*(elm)->field.tqe_prev = TAILQ_NEXT((elm), field); \
TRASHIT(*oldnext); \
TRASHIT(*oldprev); \
QMD_TRACE_ELEM(&(elm)->field); \
} while (0)
#define TAILQ_SWAP(head1, head2, type, field) do { \
struct type *swap_first = (head1)->tqh_first; \
struct type **swap_last = (head1)->tqh_last; \
(head1)->tqh_first = (head2)->tqh_first; \
(head1)->tqh_last = (head2)->tqh_last; \
(head2)->tqh_first = swap_first; \
(head2)->tqh_last = swap_last; \
if ((swap_first = (head1)->tqh_first) != NULL) \
swap_first->field.tqe_prev = &(head1)->tqh_first; \
else \
(head1)->tqh_last = &(head1)->tqh_first; \
if ((swap_first = (head2)->tqh_first) != NULL) \
swap_first->field.tqe_prev = &(head2)->tqh_first; \
else \
(head2)->tqh_last = &(head2)->tqh_first; \
} while (0)
#endif /* !_SYS_QUEUE_H_ */

File diff suppressed because it is too large Load diff

43
src/async_nif.hrl Normal file
View file

@ -0,0 +1,43 @@
%% -------------------------------------------------------------------
%%
%% async_nif: An async thread-pool layer for Erlang's NIF API
%%
%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
%% Author: Gregory Burd <greg@basho.com> <greg@burd.me>
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-define(ASYNC_NIF_CALL(Fun, Args),
begin
NIFRef = erlang:make_ref(),
case erlang:apply(Fun, [NIFRef|Args]) of
{ok, {enqueued, QDepth}} ->
[erlang:bump_reductions(10 * QDepth) || is_integer(QDepth), QDepth > 100],
receive
{NIFRef, {error, shutdown}=Error} ->
%% Work unit was queued, but not executed.
Error;
{NIFRef, {error, _Reason}=Error} ->
%% Work unit returned an error.
Error;
{NIFRef, Reply} ->
Reply
end;
Other ->
Other
end
end).

View file

@ -43,6 +43,7 @@
-ifdef(TEST). -ifdef(TEST).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-compiel(export_all).
-endif. -endif.
-define(API_VERSION, 1). -define(API_VERSION, 1).
@ -50,10 +51,13 @@
%%-define(CAPABILITIES, [async_fold, indexes]). %%-define(CAPABILITIES, [async_fold, indexes]).
-define(CAPABILITIES, [async_fold]). -define(CAPABILITIES, [async_fold]).
-record(pass, {session :: wterl:session(),
cursor :: wterl:cursor()}).
-type pass() :: #pass{}.
-record(state, {table :: string(), -record(state, {table :: string(),
connection :: wterl:connection(), connection :: wterl:connection(),
cursors :: ets:tid(), passes :: [pass()]}).
session :: wterl:session()}).
-type state() :: #state{}. -type state() :: #state{}.
-type config() :: [{atom(), term()}]. -type config() :: [{atom(), term()}].
@ -81,6 +85,7 @@ capabilities(_, _) ->
%% @doc Start the wterl backend %% @doc Start the wterl backend
-spec start(integer(), config()) -> {ok, state()} | {error, term()}. -spec start(integer(), config()) -> {ok, state()} | {error, term()}.
start(Partition, Config) -> start(Partition, Config) ->
lager:start(),
AppStart = AppStart =
case application:start(wterl) of case application:start(wterl) of
ok -> ok ->
@ -94,34 +99,30 @@ start(Partition, Config) ->
case AppStart of case AppStart of
ok -> ok ->
Table = "lsm:wt" ++ integer_to_list(Partition), Table = "lsm:wt" ++ integer_to_list(Partition),
establish_connection(Table, Config); {ok, Connection} = establish_connection(Config),
Passes = establish_passes(erlang:system_info(schedulers), Connection, Table),
{ok, #state{table=Table, connection=Connection, passes=Passes}};
{error, Reason2} -> {error, Reason2} ->
{error, Reason2} {error, Reason2}
end. end.
%% @doc Stop the wterl backend %% @doc Stop the wterl backend
-spec stop(state()) -> ok. -spec stop(state()) -> ok.
stop(#state{session=undefined, cursors=undefined}) -> stop(#state{passes=Passes}) ->
ok; lists:foreach(fun({Session, Cursor}) ->
stop(#state{session=Session, cursors=undefined}) -> ok = wterl:cursor_close(Cursor),
ok = wterl:session_close(Session); ok = wterl:session_close(Session)
stop(#state{session=Session, cursors=Cursors}=State) -> end, Passes),
ets:foldl(fun({_Table, Cursor}, _) -> ok.
ok = wterl:cursor_close(Cursor)
end, true, Cursors),
ets:delete(Cursors),
stop(State#state{session=Session, cursors=undefined}).
%% @doc Retrieve an object from the wterl backend %% @doc Retrieve an object from the wterl backend
-spec get(riak_object:bucket(), riak_object:key(), state()) -> -spec get(riak_object:bucket(), riak_object:key(), state()) ->
{ok, any(), state()} | {ok, any(), state()} |
{ok, not_found, state()} | {ok, not_found, state()} |
{error, term(), state()}. {error, term(), state()}.
get(Bucket, Key, #state{session=undefined}=State) -> get(Bucket, Key, #state{passes=Passes}=State) ->
get(Bucket, Key, establish_session(State));
get(Bucket, Key, #state{session=Session, table=Table}=State0) ->
WTKey = to_object_key(Bucket, Key), WTKey = to_object_key(Bucket, Key),
{Cursor, State} = cursor(Session, Table, get, State0), {_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
case wterl:cursor_search(Cursor, WTKey) of case wterl:cursor_search(Cursor, WTKey) of
{ok, Value} -> {ok, Value} ->
{ok, Value, State}; {ok, Value, State};
@ -139,11 +140,9 @@ get(Bucket, Key, #state{session=Session, table=Table}=State0) ->
-spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) -> -spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) ->
{ok, state()} | {ok, state()} |
{error, term(), state()}. {error, term(), state()}.
put(Bucket, PrimaryKey, IndexSpecs, Val, #state{session=undefined}=State) -> put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{passes=Passes}=State) ->
put(Bucket, PrimaryKey, IndexSpecs, Val, establish_session(State));
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{session=Session, table=Table}=State0) ->
WTKey = to_object_key(Bucket, PrimaryKey), WTKey = to_object_key(Bucket, PrimaryKey),
{Cursor, State} = cursor(Session, Table, put, State0), {_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
case wterl:cursor_insert(Cursor, WTKey, Val) of case wterl:cursor_insert(Cursor, WTKey, Val) of
ok -> ok ->
{ok, State}; {ok, State};
@ -158,11 +157,9 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{session=Session, table=Table}=S
-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) -> -spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) ->
{ok, state()} | {ok, state()} |
{error, term(), state()}. {error, term(), state()}.
delete(Bucket, Key, IndexSpecs, #state{session=undefined}=State) -> delete(Bucket, Key, _IndexSpecs, #state{passes=Passes}=State) ->
delete(Bucket, Key, IndexSpecs, establish_session(State));
delete(Bucket, Key, _IndexSpecs, #state{session=Session, table=Table}=State0) ->
WTKey = to_object_key(Bucket, Key), WTKey = to_object_key(Bucket, Key),
{Cursor, State} = cursor(Session, Table, delete, State0), {_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
case wterl:cursor_remove(Cursor, WTKey) of case wterl:cursor_remove(Cursor, WTKey) of
ok -> ok ->
{ok, State}; {ok, State};
@ -175,12 +172,12 @@ delete(Bucket, Key, _IndexSpecs, #state{session=Session, table=Table}=State0) ->
any(), any(),
[], [],
state()) -> {ok, any()} | {async, fun()}. state()) -> {ok, any()} | {async, fun()}.
fold_buckets(FoldBucketsFun, Acc, Opts, #state{session=undefined}=State) -> fold_buckets(FoldBucketsFun, Acc, Opts, #state{table=Table}) ->
fold_buckets(FoldBucketsFun, Acc, Opts, establish_session(State)); {ok, Connection} = wterl_conn:get(),
fold_buckets(FoldBucketsFun, Acc, Opts, #state{session=Session, table=Table}) ->
FoldFun = fold_buckets_fun(FoldBucketsFun), FoldFun = fold_buckets_fun(FoldBucketsFun),
BucketFolder = BucketFolder =
fun() -> fun() ->
{ok, Session} = wterl:session_open(Connection),
case wterl:cursor_open(Session, Table) of case wterl:cursor_open(Session, Table) of
{error, "No such file or directory"} -> {error, "No such file or directory"} ->
Acc; Acc;
@ -193,7 +190,8 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{session=Session, table=Table}) ->
{break, AccFinal} -> {break, AccFinal} ->
AccFinal AccFinal
after after
ok = wterl:cursor_close(Cursor) ok = wterl:cursor_close(Cursor),
ok = wterl:session_close(Session)
end end
end end
end, end,
@ -209,9 +207,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{session=Session, table=Table}) ->
any(), any(),
[{atom(), term()}], [{atom(), term()}],
state()) -> {ok, term()} | {async, fun()}. state()) -> {ok, term()} | {async, fun()}.
fold_keys(FoldKeysFun, Acc, Opts, #state{session=undefined}=State) -> fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) ->
fold_keys(FoldKeysFun, Acc, Opts, establish_session(State));
fold_keys(FoldKeysFun, Acc, Opts, #state{session=Session, table=Table}) ->
%% Figure out how we should limit the fold: by bucket, by %% Figure out how we should limit the fold: by bucket, by
%% secondary index, or neither (fold across everything.) %% secondary index, or neither (fold across everything.)
Bucket = lists:keyfind(bucket, 1, Opts), Bucket = lists:keyfind(bucket, 1, Opts),
@ -224,10 +220,13 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{session=Session, table=Table}) ->
true -> undefined true -> undefined
end, end,
{ok, Connection} = wterl_conn:get(),
%% Set up the fold... %% Set up the fold...
FoldFun = fold_keys_fun(FoldKeysFun, Limiter), FoldFun = fold_keys_fun(FoldKeysFun, Limiter),
KeyFolder = KeyFolder =
fun() -> fun() ->
{ok, Session} = wterl:session_open(Connection),
case wterl:cursor_open(Session, Table) of case wterl:cursor_open(Session, Table) of
{error, "No such file or directory"} -> {error, "No such file or directory"} ->
Acc; Acc;
@ -238,7 +237,8 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{session=Session, table=Table}) ->
{break, AccFinal} -> {break, AccFinal} ->
AccFinal AccFinal
after after
ok = wterl:cursor_close(Cursor) ok = wterl:cursor_close(Cursor),
ok = wterl:session_close(Session)
end end
end end
end, end,
@ -254,13 +254,13 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{session=Session, table=Table}) ->
any(), any(),
[{atom(), term()}], [{atom(), term()}],
state()) -> {ok, any()} | {async, fun()}. state()) -> {ok, any()} | {async, fun()}.
fold_objects(FoldObjectsFun, Acc, Opts, #state{session=undefined}=State) -> fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) ->
fold_objects(FoldObjectsFun, Acc, Opts, establish_session(State)); {ok, Connection} = wterl_conn:get(),
fold_objects(FoldObjectsFun, Acc, Opts, #state{session=Session, table=Table}) ->
Bucket = proplists:get_value(bucket, Opts), Bucket = proplists:get_value(bucket, Opts),
FoldFun = fold_objects_fun(FoldObjectsFun, Bucket), FoldFun = fold_objects_fun(FoldObjectsFun, Bucket),
ObjectFolder = ObjectFolder =
fun() -> fun() ->
{ok, Session} = wterl:session_open(Connection),
case wterl:cursor_open(Session, Table) of case wterl:cursor_open(Session, Table) of
{error, "No such file or directory"} -> {error, "No such file or directory"} ->
Acc; Acc;
@ -271,7 +271,8 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{session=Session, table=Table}) ->
{break, AccFinal} -> {break, AccFinal} ->
AccFinal AccFinal
after after
ok = wterl:cursor_close(Cursor) ok = wterl:cursor_close(Cursor),
ok = wterl:session_close(Session)
end end
end end
end, end,
@ -284,9 +285,8 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{session=Session, table=Table}) ->
%% @doc Delete all objects from this wterl backend %% @doc Delete all objects from this wterl backend
-spec drop(state()) -> {ok, state()} | {error, term(), state()}. -spec drop(state()) -> {ok, state()} | {error, term(), state()}.
drop(#state{session=undefined}=State) -> drop(#state{passes=Passes, table=Table}=State) ->
drop(establish_session(State)); {Session, _Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
drop(#state{table=Table, session=Session}=State) ->
case wterl:session_truncate(Session, Table) of case wterl:session_truncate(Session, Table) of
ok -> ok ->
{ok, State}; {ok, State};
@ -297,10 +297,9 @@ drop(#state{table=Table, session=Session}=State) ->
%% @doc Returns true if this wterl backend contains any %% @doc Returns true if this wterl backend contains any
%% non-tombstone values; otherwise returns false. %% non-tombstone values; otherwise returns false.
-spec is_empty(state()) -> boolean(). -spec is_empty(state()) -> boolean().
is_empty(#state{session=undefined}=State) -> is_empty(#state{passes=Passes}) ->
is_empty(establish_session(State)); {_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
is_empty(#state{table=Table, session=Session}) -> wterl:cursor_reset(Cursor),
{ok, Cursor} = wterl:cursor_open(Session, Table),
try try
not_found =:= wterl:cursor_next(Cursor) not_found =:= wterl:cursor_next(Cursor)
after after
@ -309,10 +308,8 @@ is_empty(#state{table=Table, session=Session}) ->
%% @doc Get the status information for this wterl backend %% @doc Get the status information for this wterl backend
-spec status(state()) -> [{atom(), term()}]. -spec status(state()) -> [{atom(), term()}].
status(#state{session=undefined}=State) -> status(#state{passes=Passes}) ->
status(establish_session(State)); {_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
status(#state{table=Table, session=Session}) ->
{ok, Cursor} = wterl:cursor_open(Session, "statistics:"++Table),
try try
Stats = fetch_status(Cursor), Stats = fetch_status(Cursor),
[{stats, Stats}] [{stats, Stats}]
@ -330,21 +327,8 @@ callback(_Ref, _Msg, State) ->
%% Internal functions %% Internal functions
%% =================================================================== %% ===================================================================
cursor(Session, Table, Type, #state{cursors=undefined}=State) ->
Cursors = ets:new(?MODULE, [{read_concurrency, true}]),
cursor(Session, Table, Type, State#state{cursors=Cursors});
cursor(Session, Table, Type, #state{cursors=Cursors}=State) ->
case ets:lookup(Cursors, {Type, Table}) of
[{{_Type, Table}, Cursor}] ->
{Cursor, State};
_ ->
{ok, Cursor} = wterl:cursor_open(Session, Table),
ets:insert(Cursors, {Table, Cursor}),
{Cursor, State}
end.
%% @private %% @private
establish_connection(Table, Config) -> establish_connection(Config) ->
%% Get the data root directory %% Get the data root directory
case app_helper:get_prop_or_env(data_root, Config, wterl) of case app_helper:get_prop_or_env(data_root, Config, wterl) of
undefined -> undefined ->
@ -373,19 +357,33 @@ establish_connection(Table, Config) ->
%"salvage", "verify", "write", "evict", "lsm" %"salvage", "verify", "write", "evict", "lsm"
]) ])
] ++ proplists:get_value(wterl, Config, [])), % sec ] ++ proplists:get_value(wterl, Config, [])), % sec
%%lager:info("WiredTiger connection:open(~s, ~s)", [DataRoot, wterl:config_to_bin(Opts)]), lager:info("WiredTiger connection:open(~s, ~s)", [DataRoot, wterl:config_to_bin(Opts)]),
case wterl_conn:open(DataRoot, Opts) of case wterl_conn:open(DataRoot, Opts) of
{ok, Connection} -> {ok, Connection} ->
{ok, #state{table=Table, connection=Connection}}; {ok, Connection};
{error, Reason2} -> {error, Reason2} ->
lager:error("Failed to establish a WiredTiger connection, wterl backend unable to start: ~p\n", [Reason2]), lager:error("Failed to establish a WiredTiger connection, wterl backend unable to start: ~p\n", [Reason2]),
{error, Reason2} {error, Reason2}
end end
end. end.
establish_passes(Count, Connection, Table)
when is_number(Count), Count > 0 ->
establish_passes(Count, Connection, Table, []).
establish_passes(Count, Connection, Table, Acc)
when Count > 0 ->
case Count > 1 of
true ->
{ok, Session} = establish_session(Connection, Table),
{ok, Cursor} = wterl:cursor_open(Session, Table),
[{Session, Cursor} | establish_passes(Count - 1, Connection, Table, Acc)];
false ->
Acc
end.
%% @private %% @private
establish_session(#state{table=Table, session=undefined}=State) -> establish_session(Connection, Table) ->
{ok, Connection} = wterl_conn:get(),
case wterl:session_open(Connection, wterl:config_to_bin([{isolation, "snapshot"}])) of case wterl:session_open(Connection, wterl:config_to_bin([{isolation, "snapshot"}])) of
{ok, Session} -> {ok, Session} ->
SessionOpts = SessionOpts =
@ -398,17 +396,15 @@ establish_session(#state{table=Table, session=undefined}=State) ->
{lsm_bloom_config, [{leaf_page_max, "10MB"}]} ], {lsm_bloom_config, [{leaf_page_max, "10MB"}]} ],
case wterl:session_create(Session, Table, wterl:config_to_bin(SessionOpts)) of case wterl:session_create(Session, Table, wterl:config_to_bin(SessionOpts)) of
ok -> ok ->
State#state{session=Session}; {ok, Session};
{error, Reason} -> {error, Reason} ->
lager:error("Failed to start wterl backend: ~p\n", [Reason]), lager:error("Failed to start wterl backend: ~p\n", [Reason]),
State {error, Reason}
end; end;
{error, Reason} -> {error, Reason} ->
lager:error("Failed to open a WiredTiger session: ~p\n", [Reason]), lager:error("Failed to open a WiredTiger session: ~p\n", [Reason]),
State {error, Reason}
end; end.
establish_session(State) ->
State.
%% @private %% @private
%% Return a function to fold over the buckets on this backend %% Return a function to fold over the buckets on this backend
@ -555,7 +551,7 @@ size_cache(RequestedSize) ->
"1GB" "1GB"
end, end,
application:set_env(wterl, cache_size, FinalGuess), application:set_env(wterl, cache_size, FinalGuess),
lager:info("Using cache size of ~p for WiredTiger storage backend.", [FinalGuess]), lager:info("Using cache size of ~p for WiredTiger storage backend.", [FinalGuess]),
FinalGuess; FinalGuess;
Value when is_list(Value) -> Value when is_list(Value) ->
Value; Value;

View file

@ -272,13 +272,16 @@ empty_check({Backend, State}) ->
}. }.
setup({BackendMod, Config}) -> setup({BackendMod, Config}) ->
lager:start(), application:start(lager),
application:start(sasl), application:start(sasl),
application:start(os_mon), application:start(os_mon),
{ok, S} = BackendMod:start(42, Config), {ok, S} = BackendMod:start(42, Config),
{BackendMod, S}. {BackendMod, S}.
cleanup({BackendMod, S}) -> cleanup({BackendMod, S}) ->
ok = BackendMod:stop(S). ok = BackendMod:stop(S),
application:stop(lager),
application:stop(sasl),
application:stop(os_mon).
-endif. % TEST -endif. % TEST

View file

@ -63,6 +63,8 @@
fold_keys/3, fold_keys/3,
fold/3]). fold/3]).
-include("async_nif.hrl").
-ifdef(TEST). -ifdef(TEST).
-ifdef(EQC). -ifdef(EQC).
-include_lib("eqc/include/eqc.hrl"). -include_lib("eqc/include/eqc.hrl").
@ -103,147 +105,262 @@ init() ->
erlang:load_nif(filename:join(PrivDir, atom_to_list(?MODULE)), 0). erlang:load_nif(filename:join(PrivDir, atom_to_list(?MODULE)), 0).
-spec conn_open(string(), config()) -> {ok, connection()} | {error, term()}. -spec conn_open(string(), config()) -> {ok, connection()} | {error, term()}.
conn_open(_HomeDir, _Config) -> conn_open(HomeDir, Config) ->
?ASYNC_NIF_CALL(fun conn_open_nif/3, [HomeDir, Config]).
-spec conn_open_nif(reference(), string(), config()) -> {ok, connection()} | {error, term()}.
conn_open_nif(_AsyncRef, _HomeDir, _Config) ->
?nif_stub. ?nif_stub.
-spec conn_close(connection()) -> ok | {error, term()}. -spec conn_close(connection()) -> ok | {error, term()}.
conn_close(_ConnRef) -> conn_close(ConnRef) ->
?ASYNC_NIF_CALL(fun conn_close_nif/2, [ConnRef]).
-spec conn_close_nif(reference(), connection()) -> ok | {error, term()}.
conn_close_nif(_AsyncRef, _ConnRef) ->
?nif_stub. ?nif_stub.
-spec session_open(connection()) -> {ok, session()} | {error, term()}. -spec session_open(connection()) -> {ok, session()} | {error, term()}.
-spec session_open(connection(), config()) -> {ok, session()} | {error, term()}.
session_open(ConnRef) -> session_open(ConnRef) ->
session_open(ConnRef, ?EMPTY_CONFIG). session_open(ConnRef, ?EMPTY_CONFIG).
session_open(ConnRef, Config) ->
?ASYNC_NIF_CALL(fun session_open_nif/3, [ConnRef, Config]).
-spec session_open(connection(), config()) -> {ok, session()} | {error, term()}. -spec session_open_nif(reference(), connection(), config()) -> {ok, session()} | {error, term()}.
session_open(_ConnRef, _Config) -> session_open_nif(_AsyncRef, _ConnRef, _Config) ->
?nif_stub. ?nif_stub.
-spec session_close(session()) -> ok | {error, term()}. -spec session_close(session()) -> ok | {error, term()}.
session_close(_Ref) -> session_close(Ref) ->
?ASYNC_NIF_CALL(fun session_close_nif/2, [Ref]).
-spec session_close_nif(reference(), session()) -> ok | {error, term()}.
session_close_nif(_AsyncRef, _Ref) ->
?nif_stub. ?nif_stub.
-spec session_create(session(), string()) -> ok | {error, term()}. -spec session_create(session(), string()) -> ok | {error, term()}.
-spec session_create(session(), string(), config()) -> ok | {error, term()}. -spec session_create(session(), string(), config()) -> ok | {error, term()}.
session_create(Ref, Name) -> session_create(Ref, Name) ->
session_create(Ref, Name, ?EMPTY_CONFIG). session_create(Ref, Name, ?EMPTY_CONFIG).
session_create(_Ref, _Name, _Config) -> session_create(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun session_create_nif/4, [Ref, Name, Config]).
-spec session_create_nif(reference(), session(), string(), config()) -> ok | {error, term()}.
session_create_nif(_AsyncNif, _Ref, _Name, _Config) ->
?nif_stub. ?nif_stub.
-spec session_drop(session(), string()) -> ok | {error, term()}. -spec session_drop(session(), string()) -> ok | {error, term()}.
-spec session_drop(session(), string(), config()) -> ok | {error, term()}. -spec session_drop(session(), string(), config()) -> ok | {error, term()}.
session_drop(Ref, Name) -> session_drop(Ref, Name) ->
session_drop(Ref, Name, ?EMPTY_CONFIG). session_drop(Ref, Name, ?EMPTY_CONFIG).
session_drop(_Ref, _Name, _Config) -> session_drop(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun session_drop_nif/4, [Ref, Name, Config]).
-spec session_drop_nif(reference(), session(), string(), config()) -> ok | {error, term()}.
session_drop_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub. ?nif_stub.
-spec session_delete(session(), string(), key()) -> ok | {error, term()}. -spec session_delete(session(), string(), key()) -> ok | {error, term()}.
session_delete(_Ref, _Table, _Key) -> session_delete(Ref, Table, Key) ->
?ASYNC_NIF_CALL(fun session_delete_nif/4, [Ref, Table, Key]).
-spec session_delete_nif(reference(), session(), string(), key()) -> ok | {error, term()}.
session_delete_nif(_AsyncRef, _Ref, _Table, _Key) ->
?nif_stub. ?nif_stub.
-spec session_get(session(), string(), key()) -> {ok, value()} | not_found | {error, term()}. -spec session_get(session(), string(), key()) -> {ok, value()} | not_found | {error, term()}.
session_get(_Ref, _Table, _Key) -> session_get(Ref, Table, Key) ->
?ASYNC_NIF_CALL(fun session_get_nif/4, [Ref, Table, Key]).
-spec session_get_nif(reference(), session(), string(), key()) -> {ok, value()} | not_found | {error, term()}.
session_get_nif(_AsyncRef, _Ref, _Table, _Key) ->
?nif_stub. ?nif_stub.
-spec session_put(session(), string(), key(), value()) -> ok | {error, term()}. -spec session_put(session(), string(), key(), value()) -> ok | {error, term()}.
session_put(_Ref, _Table, _Key, _Value) -> session_put(Ref, Table, Key, Value) ->
?ASYNC_NIF_CALL(fun session_put_nif/5, [Ref, Table, Key, Value]).
-spec session_put_nif(reference(), session(), string(), key(), value()) -> ok | {error, term()}.
session_put_nif(_AsyncRef, _Ref, _Table, _Key, _Value) ->
?nif_stub. ?nif_stub.
-spec session_rename(session(), string(), string()) -> ok | {error, term()}. -spec session_rename(session(), string(), string()) -> ok | {error, term()}.
-spec session_rename(session(), string(), string(), config()) -> ok | {error, term()}. -spec session_rename(session(), string(), string(), config()) -> ok | {error, term()}.
session_rename(Ref, OldName, NewName) -> session_rename(Ref, OldName, NewName) ->
session_rename(Ref, OldName, NewName, ?EMPTY_CONFIG). session_rename(Ref, OldName, NewName, ?EMPTY_CONFIG).
session_rename(_Ref, _OldName, _NewName, _Config) -> session_rename(Ref, OldName, NewName, Config) ->
?ASYNC_NIF_CALL(fun session_rename_nif/5, [Ref, OldName, NewName, Config]).
-spec session_rename_nif(reference(), session(), string(), string(), config()) -> ok | {error, term()}.
session_rename_nif(_AsyncRef, _Ref, _OldName, _NewName, _Config) ->
?nif_stub. ?nif_stub.
-spec session_salvage(session(), string()) -> ok | {error, term()}. -spec session_salvage(session(), string()) -> ok | {error, term()}.
-spec session_salvage(session(), string(), config()) -> ok | {error, term()}. -spec session_salvage(session(), string(), config()) -> ok | {error, term()}.
session_salvage(Ref, Name) -> session_salvage(Ref, Name) ->
session_salvage(Ref, Name, ?EMPTY_CONFIG). session_salvage(Ref, Name, ?EMPTY_CONFIG).
session_salvage(_Ref, _Name, _Config) -> session_salvage(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun session_salvage_nif/4, [Ref, Name, Config]).
-spec session_salvage_nif(reference(), session(), string(), config()) -> ok | {error, term()}.
session_salvage_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub. ?nif_stub.
-spec session_checkpoint(session()) -> ok | {error, term()}. -spec session_checkpoint(session()) -> ok | {error, term()}.
-spec session_checkpoint(session(), config()) -> ok | {error, term()}. -spec session_checkpoint(session(), config()) -> ok | {error, term()}.
session_checkpoint(_Ref) -> session_checkpoint(_Ref) ->
session_checkpoint(_Ref, ?EMPTY_CONFIG). session_checkpoint(_Ref, ?EMPTY_CONFIG).
session_checkpoint(_Ref, _Config) -> session_checkpoint(Ref, Config) ->
?ASYNC_NIF_CALL(fun session_checkpoint_nif/3, [Ref, Config]).
-spec session_checkpoint_nif(reference(), session(), config()) -> ok | {error, term()}.
session_checkpoint_nif(_AsyncRef, _Ref, _Config) ->
?nif_stub. ?nif_stub.
-spec session_truncate(session(), string()) -> ok | {error, term()}. -spec session_truncate(session(), string()) -> ok | {error, term()}.
-spec session_truncate(session(), string(), config()) -> ok | {error, term()}. -spec session_truncate(session(), string(), config()) -> ok | {error, term()}.
session_truncate(Ref, Name) -> session_truncate(Ref, Name) ->
session_truncate(Ref, Name, ?EMPTY_CONFIG). session_truncate(Ref, Name, ?EMPTY_CONFIG).
session_truncate(_Ref, _Name, _Config) -> session_truncate(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun session_truncate_nif/4, [Ref, Name, Config]).
-spec session_truncate_nif(reference(), session(), string(), config()) -> ok | {error, term()}.
session_truncate_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub. ?nif_stub.
-spec session_upgrade(session(), string()) -> ok | {error, term()}. -spec session_upgrade(session(), string()) -> ok | {error, term()}.
-spec session_upgrade(session(), string(), config()) -> ok | {error, term()}. -spec session_upgrade(session(), string(), config()) -> ok | {error, term()}.
session_upgrade(Ref, Name) -> session_upgrade(Ref, Name) ->
session_upgrade(Ref, Name, ?EMPTY_CONFIG). session_upgrade(Ref, Name, ?EMPTY_CONFIG).
session_upgrade(_Ref, _Name, _Config) -> session_upgrade(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun session_upgrade_nif/4, [Ref, Name, Config]).
-spec session_upgrade_nif(reference(), session(), string(), config()) -> ok | {error, term()}.
session_upgrade_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub. ?nif_stub.
-spec session_verify(session(), string()) -> ok | {error, term()}. -spec session_verify(session(), string()) -> ok | {error, term()}.
-spec session_verify(session(), string(), config()) -> ok | {error, term()}. -spec session_verify(session(), string(), config()) -> ok | {error, term()}.
session_verify(Ref, Name) -> session_verify(Ref, Name) ->
session_verify(Ref, Name, ?EMPTY_CONFIG). session_verify(Ref, Name, ?EMPTY_CONFIG).
session_verify(_Ref, _Name, _Config) -> session_verify(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun session_verify_nif/4, [Ref, Name, Config]).
-spec session_verify_nif(reference(), session(), string(), config()) -> ok | {error, term()}.
session_verify_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub. ?nif_stub.
-spec cursor_open(session(), string()) -> {ok, cursor()} | {error, term()}. -spec cursor_open(session(), string()) -> {ok, cursor()} | {error, term()}.
cursor_open(_Ref, _Table) -> cursor_open(Ref, Table) ->
?ASYNC_NIF_CALL(fun cursor_open_nif/3, [Ref, Table]).
-spec cursor_open_nif(reference(), session(), string()) -> {ok, cursor()} | {error, term()}.
cursor_open_nif(_AsyncRef, _Ref, _Table) ->
?nif_stub. ?nif_stub.
-spec cursor_close(cursor()) -> ok | {error, term()}. -spec cursor_close(cursor()) -> ok | {error, term()}.
cursor_close(_Cursor) -> cursor_close(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_close_nif/2, [Cursor]).
-spec cursor_close_nif(reference(), cursor()) -> ok | {error, term()}.
cursor_close_nif(_AsyncRef, _Cursor) ->
?nif_stub. ?nif_stub.
-spec cursor_next(cursor()) -> {ok, key(), value()} | not_found | {error, term()}. -spec cursor_next(cursor()) -> {ok, key(), value()} | not_found | {error, term()}.
cursor_next(_Cursor) -> cursor_next(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_next_nif/2, [Cursor]).
-spec cursor_next_nif(reference(), cursor()) -> {ok, key(), value()} | not_found | {error, term()}.
cursor_next_nif(_AsyncRef, _Cursor) ->
?nif_stub. ?nif_stub.
-spec cursor_next_key(cursor()) -> {ok, key()} | not_found | {error, term()}. -spec cursor_next_key(cursor()) -> {ok, key()} | not_found | {error, term()}.
cursor_next_key(_Cursor) -> cursor_next_key(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_next_key_nif/2, [Cursor]).
-spec cursor_next_key_nif(reference(), cursor()) -> {ok, key()} | not_found | {error, term()}.
cursor_next_key_nif(_AsyncRef, _Cursor) ->
?nif_stub. ?nif_stub.
-spec cursor_next_value(cursor()) -> {ok, value()} | not_found | {error, term()}. -spec cursor_next_value(cursor()) -> {ok, value()} | not_found | {error, term()}.
cursor_next_value(_Cursor) -> cursor_next_value(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_next_value_nif/2, [Cursor]).
-spec cursor_next_value_nif(reference(), cursor()) -> {ok, value()} | not_found | {error, term()}.
cursor_next_value_nif(_AsyncRef, _Cursor) ->
?nif_stub. ?nif_stub.
-spec cursor_prev(cursor()) -> {ok, key(), value()} | not_found | {error, term()}. -spec cursor_prev(cursor()) -> {ok, key(), value()} | not_found | {error, term()}.
cursor_prev(_Cursor) -> cursor_prev(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_prev_nif/2, [Cursor]).
-spec cursor_prev_nif(reference(), cursor()) -> {ok, key(), value()} | not_found | {error, term()}.
cursor_prev_nif(_AsyncRef, _Cursor) ->
?nif_stub. ?nif_stub.
-spec cursor_prev_key(cursor()) -> {ok, key()} | not_found | {error, term()}. -spec cursor_prev_key(cursor()) -> {ok, key()} | not_found | {error, term()}.
cursor_prev_key(_Cursor) -> cursor_prev_key(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_prev_key_nif/2, [Cursor]).
-spec cursor_prev_key_nif(reference(), cursor()) -> {ok, key()} | not_found | {error, term()}.
cursor_prev_key_nif(_AsyncRef, _Cursor) ->
?nif_stub. ?nif_stub.
-spec cursor_prev_value(cursor()) -> {ok, value()} | not_found | {error, term()}. -spec cursor_prev_value(cursor()) -> {ok, value()} | not_found | {error, term()}.
cursor_prev_value(_Cursor) -> cursor_prev_value(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_prev_value_nif/2, [Cursor]).
-spec cursor_prev_value_nif(reference(), cursor()) -> {ok, value()} | not_found | {error, term()}.
cursor_prev_value_nif(_AsyncRef, _Cursor) ->
?nif_stub. ?nif_stub.
-spec cursor_search(cursor(), key()) -> {ok, value()} | {error, term()}. -spec cursor_search(cursor(), key()) -> {ok, value()} | {error, term()}.
cursor_search(_Cursor, _Key) -> cursor_search(Cursor, Key) ->
?ASYNC_NIF_CALL(fun cursor_search_nif/3, [Cursor, Key]).
-spec cursor_search_nif(reference(), cursor(), key()) -> {ok, value()} | {error, term()}.
cursor_search_nif(_AsyncRef, _Cursor, _Key) ->
?nif_stub. ?nif_stub.
-spec cursor_search_near(cursor(), key()) -> {ok, value()} | {error, term()}. -spec cursor_search_near(cursor(), key()) -> {ok, value()} | {error, term()}.
cursor_search_near(_Cursor, _Key) -> cursor_search_near(Cursor, Key) ->
?ASYNC_NIF_CALL(fun cursor_search_near_nif/3, [Cursor, Key]).
-spec cursor_search_near_nif(reference(), cursor(), key()) -> {ok, value()} | {error, term()}.
cursor_search_near_nif(_AsyncRef, _Cursor, _Key) ->
?nif_stub. ?nif_stub.
-spec cursor_reset(cursor()) -> ok | {error, term()}. -spec cursor_reset(cursor()) -> ok | {error, term()}.
cursor_reset(_Cursor) -> cursor_reset(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_reset_nif/2, [Cursor]).
-spec cursor_reset_nif(reference(), cursor()) -> ok | {error, term()}.
cursor_reset_nif(_AsyncRef, _Cursor) ->
?nif_stub. ?nif_stub.
-spec cursor_insert(cursor(), key(), value()) -> ok | {error, term()}. -spec cursor_insert(cursor(), key(), value()) -> ok | {error, term()}.
cursor_insert(_Cursor, _Key, _Value) -> cursor_insert(Cursor, Key, Value) ->
?ASYNC_NIF_CALL(fun cursor_insert_nif/4, [Cursor, Key, Value]).
-spec cursor_insert_nif(reference(), cursor(), key(), value()) -> ok | {error, term()}.
cursor_insert_nif(_AsyncRef, _Cursor, _Key, _Value) ->
?nif_stub. ?nif_stub.
-spec cursor_update(cursor(), key(), value()) -> ok | {error, term()}. -spec cursor_update(cursor(), key(), value()) -> ok | {error, term()}.
cursor_update(_Cursor, _Key, _Value) -> cursor_update(Cursor, Key, Value) ->
?ASYNC_NIF_CALL(fun cursor_update_nif/4, [Cursor, Key, Value]).
-spec cursor_update_nif(reference(), cursor(), key(), value()) -> ok | {error, term()}.
cursor_update_nif(_AsyncRef, _Cursor, _Key, _Value) ->
?nif_stub. ?nif_stub.
-spec cursor_remove(cursor(), key()) -> ok | {error, term()}. -spec cursor_remove(cursor(), key()) -> ok | {error, term()}.
cursor_remove(_Cursor, _Key) -> cursor_remove(Cursor, Key) ->
?ASYNC_NIF_CALL(fun cursor_remove_nif/3, [Cursor, Key]).
-spec cursor_remove_nif(reference(), cursor(), key()) -> ok | {error, term()}.
cursor_remove_nif(_AsyncRef, _Cursor, _Key) ->
?nif_stub. ?nif_stub.
-type fold_keys_fun() :: fun((Key::binary(), any()) -> any()). -type fold_keys_fun() :: fun((Key::binary(), any()) -> any()).
@ -371,7 +488,9 @@ config_to_bin([{Key, Value} | Rest], Acc) ->
-define(TEST_DATA_DIR, "test/wterl.basic"). -define(TEST_DATA_DIR, "test/wterl.basic").
open_test_conn(DataDir) -> open_test_conn(DataDir) ->
?assertCmd("rm -rf "++DataDir), {ok, CWD} = file:get_cwd(),
?assertMatch(true, lists:suffix("wterl/.eunit", CWD)),
?cmd("rm -rf "++DataDir),
?assertMatch(ok, filelib:ensure_dir(filename:join(DataDir, "x"))), ?assertMatch(ok, filelib:ensure_dir(filename:join(DataDir, "x"))),
OpenConfig = config_to_bin([{create,true},{cache_size,"100MB"}]), OpenConfig = config_to_bin([{create,true},{cache_size,"100MB"}]),
{ok, ConnRef} = conn_open(DataDir, OpenConfig), {ok, ConnRef} = conn_open(DataDir, OpenConfig),
@ -637,9 +756,11 @@ prop_put_delete() ->
?LET({Keys, Values}, {keys(), values()}, ?LET({Keys, Values}, {keys(), values()},
?FORALL(Ops, eqc_gen:non_empty(list(ops(Keys, Values))), ?FORALL(Ops, eqc_gen:non_empty(list(ops(Keys, Values))),
begin begin
DataDir = "/tmp/wterl.putdelete.qc", DataDir = "test/wterl.putdelete.qc",
Table = "table:eqc", Table = "table:eqc",
?cmd("rm -rf "++DataDir), {ok, CWD} = file:get_cwd(),
?assertMatch(true, lists:suffix("wterl/.eunit", CWD)),
?cmd("rm -rf "++DataDir),
ok = filelib:ensure_dir(filename:join(DataDir, "x")), ok = filelib:ensure_dir(filename:join(DataDir, "x")),
Cfg = wterl:config_to_bin([{create,true}]), Cfg = wterl:config_to_bin([{create,true}]),
{ok, Conn} = wterl:conn_open(DataDir, Cfg), {ok, Conn} = wterl:conn_open(DataDir, Cfg),