This is a different approach from async2, instead of a single queue

and many workers this has a queue per scheduler and a few workers per
queue.
This commit is contained in:
Gregory Burd 2013-04-14 08:44:54 -04:00
parent 456129e7f3
commit 8fe9dc9bad
5 changed files with 242 additions and 223 deletions

View file

@ -28,7 +28,7 @@ extern "C" {
#include "queue.h"
#define ASYNC_NIF_MAX_WORKERS 32
#define ASYNC_NIF_MAX_WORKERS 128
struct async_nif_req_entry {
ERL_NIF_TERM ref, *argv;
@ -40,25 +40,27 @@ struct async_nif_req_entry {
STAILQ_ENTRY(async_nif_req_entry) entries;
};
struct async_nif_work_queue {
ErlNifMutex *reqs_mutex;
ErlNifCond *reqs_cnd;
STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
};
struct async_nif_worker_entry {
ErlNifTid tid;
LIST_ENTRY(async_nif_worker_entry) entries;
unsigned int worker_id;
struct async_nif_state *async_nif;
struct async_nif_work_queue *q;
};
struct async_nif_state {
volatile unsigned int req_count;
volatile unsigned int shutdown;
ErlNifMutex *req_mutex;
ErlNifCond *cnd;
STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
unsigned int req_count;
unsigned int shutdown;
unsigned int num_workers;
struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS];
};
struct async_nif_worker_info {
struct async_nif_state *async_nif;
struct async_nif_worker_entry *worker;
unsigned int worker_id;
unsigned int num_queues;
unsigned int next_q;
struct async_nif_work_queue queues[ASYNC_NIF_MAX_WORKERS];
};
#define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \
@ -72,10 +74,12 @@ struct async_nif_worker_info {
struct decl ## _args *args = &on_stack_args; \
struct decl ## _args *copy_of_args; \
struct async_nif_req_entry *req = NULL; \
int scheduler_id = 0; \
ErlNifEnv *new_env = NULL; \
/* argv[0] is a ref used for selective recv */ \
const ERL_NIF_TERM *argv = argv_in + 1; \
argc--; \
/* argv[1] is the current Erlang (scheduler_id - 1) */ \
const ERL_NIF_TERM *argv = argv_in + 2; \
argc -= 2; \
struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env); \
if (async_nif->shutdown) \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
@ -107,7 +111,8 @@ struct async_nif_worker_info {
req->args = (void*)copy_of_args; \
req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \
req->fn_post = (void (*)(void *))fn_post_ ## decl; \
return async_nif_enqueue_req(async_nif, req); \
enif_get_int(env, argv_in[1], &scheduler_id); \
return async_nif_enqueue_req(async_nif, req, scheduler_id); \
}
#define ASYNC_NIF_INIT(name) \
@ -143,7 +148,7 @@ struct async_nif_worker_info {
#define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg))
static ERL_NIF_TERM
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req)
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int scheduler_id)
{
/* If we're shutting down return an error term and ignore the request. */
if (async_nif->shutdown) {
@ -151,51 +156,62 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
enif_make_atom(req->env, "shutdown"));
}
/* We manage one request queue per-scheduler thread running in the Erlang VM.
Each request is placed onto the queue based on which schdeuler thread
was processing the request. Work queues are balanced only if requests
arrive from a sufficiently random distribution of Erlang scheduler
threads. */
unsigned int qid = async_nif->next_q; // Keep a local to avoid the race.
struct async_nif_work_queue *q = &async_nif->queues[qid];
async_nif->next_q = (qid + 1) % async_nif->num_queues;
/* Otherwise, add the request to the work queue. */
enif_mutex_lock(async_nif->req_mutex);
STAILQ_INSERT_TAIL(&async_nif->reqs, req, entries);
enif_mutex_lock(q->reqs_mutex);
STAILQ_INSERT_TAIL(&q->reqs, req, entries);
async_nif->req_count++;
//fprintf(stderr, "enqueued %d (%d)\r\n", qid, async_nif->req_count); fflush(stderr);
/* Build the term before releasing the lock so as not to race on the use of
the req pointer. */
ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"),
enif_make_tuple2(req->env, enif_make_atom(req->env, "enqueued"),
enif_make_int(req->env, async_nif->req_count)));
enif_mutex_unlock(async_nif->req_mutex);
enif_cond_broadcast(async_nif->cnd);
enif_mutex_unlock(q->reqs_mutex);
enif_cond_broadcast(q->reqs_cnd);
return reply;
}
static void *
async_nif_worker_fn(void *arg)
{
struct async_nif_worker_info *wi = (struct async_nif_worker_info *)arg;
struct async_nif_state *async_nif = wi->async_nif;
unsigned int worker_id = wi->worker_id;
enif_free(arg); // Allocated when starting the thread, now no longer needed.
struct async_nif_worker_entry *we = (struct async_nif_worker_entry *)arg;
unsigned int worker_id = we->worker_id;
struct async_nif_state *async_nif = we->async_nif;
struct async_nif_work_queue *q = we->q;
for(;;) {
struct async_nif_req_entry *req = NULL;
/* Examine the request queue, are there things to be done? */
enif_mutex_lock(async_nif->req_mutex);
enif_mutex_lock(q->reqs_mutex);
check_again_for_work:
if (async_nif->shutdown) {
enif_mutex_unlock(async_nif->req_mutex);
enif_mutex_unlock(q->reqs_mutex);
break;
}
if ((req = STAILQ_FIRST(&async_nif->reqs)) == NULL) {
if ((req = STAILQ_FIRST(&q->reqs)) == NULL) {
/* Queue is empty, wait for work */
enif_cond_wait(async_nif->cnd, async_nif->req_mutex);
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work;
} else {
/* `req` is our work request and we hold the req_mutex lock. */
/* At this point, `req` is ours to execute and we hold the reqs_mutex lock. */
do {
/* Take the request off the queue. */
STAILQ_REMOVE(&async_nif->reqs, req, async_nif_req_entry, entries);
//fprintf(stderr, "worker %d queue %d performing req (%d)\r\n", worker_id, (worker_id % async_nif->num_queues), async_nif->req_count); fflush(stderr);
STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries);
async_nif->req_count--;
enif_mutex_unlock(async_nif->req_mutex);
enif_mutex_unlock(q->reqs_mutex);
/* Finally, do the work. */
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
@ -204,14 +220,14 @@ async_nif_worker_fn(void *arg)
enif_free_env(req->env);
enif_free(req);
/* Review the work queue, start more worker threads if they are needed. */
// TODO: if queue_depth > last_depth && num_workers < MAX, start one up
/* Continue working if more requests are in the queue, otherwise wait
for new work to arrive. */
enif_mutex_lock(async_nif->req_mutex);
if ((req = STAILQ_FIRST(&async_nif->reqs)) == NULL) {
enif_mutex_unlock(async_nif->req_mutex);
if (STAILQ_EMPTY(&q->reqs)) {
req = NULL;
} else {
enif_cond_broadcast(q->reqs_cnd);
enif_mutex_lock(q->reqs_mutex);
req = STAILQ_FIRST(&q->reqs);
}
} while(req);
@ -228,10 +244,13 @@ async_nif_unload(ErlNifEnv *env)
struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env);
/* Signal the worker threads, stop what you're doing and exit. */
enif_mutex_lock(async_nif->req_mutex);
async_nif->shutdown = 1;
enif_cond_broadcast(async_nif->cnd);
enif_mutex_unlock(async_nif->req_mutex);
/* Wake up any waiting worker threads. */
for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i];
enif_cond_broadcast(q->reqs_cnd);
}
/* Join for the now exiting worker threads. */
for (i = 0; i < async_nif->num_workers; ++i) {
@ -239,14 +258,16 @@ async_nif_unload(ErlNifEnv *env)
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value);
}
/* We won't get here until all threads have exited.
Patch things up, and carry on. */
enif_mutex_lock(async_nif->req_mutex);
/* Cleanup requests, mutexes and conditions in each work queue. */
for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i];
enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd);
/* Worker threads are stopped, now toss anything left in the queue. */
struct async_nif_req_entry *req = NULL;
STAILQ_FOREACH(req, &async_nif->reqs, entries) {
STAILQ_REMOVE(&async_nif->reqs, STAILQ_LAST(&async_nif->reqs, async_nif_req_entry, entries),
STAILQ_FOREACH(req, &q->reqs, entries) {
STAILQ_REMOVE(&q->reqs, STAILQ_LAST(&q->reqs, async_nif_req_entry, entries),
async_nif_req_entry, entries);
enif_send(NULL, &req->pid, req->env,
enif_make_tuple2(req->env, enif_make_atom(req->env, "error"),
@ -256,13 +277,7 @@ async_nif_unload(ErlNifEnv *env)
enif_free(req);
async_nif->req_count--;
}
enif_mutex_unlock(async_nif->req_mutex);
bzero(async_nif->worker_entries, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
enif_cond_destroy(async_nif->cnd);
async_nif->cnd = NULL;
enif_mutex_destroy(async_nif->req_mutex);
async_nif->req_mutex = NULL;
}
bzero(async_nif, sizeof(struct async_nif_state));
enif_free(async_nif);
}
@ -271,7 +286,7 @@ static void *
async_nif_load(void)
{
static int has_init = 0;
int i, num_schedulers;
unsigned int i, j;
ErlNifSysInfo info;
struct async_nif_state *async_nif;
@ -281,53 +296,59 @@ async_nif_load(void)
/* Find out how many schedulers there are. */
enif_system_info(&info, sizeof(ErlNifSysInfo));
num_schedulers = info.scheduler_threads;
/* Init our portion of priv_data's module-specific state. */
async_nif = enif_alloc(sizeof(struct async_nif_state));
if (!async_nif)
return NULL;
bzero(async_nif, sizeof(struct async_nif_state));
STAILQ_INIT(&(async_nif->reqs));
async_nif->num_queues = info.scheduler_threads;
async_nif->next_q = 0;
async_nif->req_count = 0;
async_nif->shutdown = 0;
async_nif->req_mutex = enif_mutex_create(NULL);
async_nif->cnd = enif_cond_create(NULL);
/* Setup the requests management. */
async_nif->req_count = 0;
for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i];
STAILQ_INIT(&q->reqs);
q->reqs_mutex = enif_mutex_create(NULL);
q->reqs_cnd = enif_cond_create(NULL);
}
/* Setup the thread pool management. */
bzero(async_nif->worker_entries, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
/* Start the minimum of max workers allowed or number of scheduler threads running. */
unsigned int num_worker_threads = ASYNC_NIF_MAX_WORKERS;
if (num_schedulers < ASYNC_NIF_MAX_WORKERS)
num_worker_threads = num_schedulers;
if (num_worker_threads < 1)
num_worker_threads = 1;
num_worker_threads = ASYNC_NIF_MAX_WORKERS; // TODO: make this dynamic at some point
/* Start the worker threads. */
//unsigned int num_workers = ASYNC_NIF_MAX_WORKERS - (ASYNC_NIF_MAX_WORKERS % async_nif->num_queues);
unsigned int num_workers = async_nif->num_queues;
//unsigned int allocation = 1;
//if (num_workers > async_nif->num_queues) {
// allocation = num_workers / async_nif->num_queues;
//}
for (i = 0; i < num_worker_threads; i++) {
struct async_nif_worker_info *wi;
wi = enif_alloc(sizeof(struct async_nif_worker_info)); // TODO: check
bzero(wi, sizeof(struct async_nif_worker_info));
wi->async_nif = async_nif;
wi->worker = &async_nif->worker_entries[i];
wi->worker_id = i;
for (i = 0; i < num_workers; i++) {
struct async_nif_worker_entry *we = &async_nif->worker_entries[i];
we->async_nif = async_nif;
we->worker_id = i;
we->q = &async_nif->queues[i % async_nif->num_queues];
//fprintf(stderr, "%d:%d:%d | allocating worker_id %d to queue %d\r\n", num_workers, async_nif->num_queues, allocation, i, i % async_nif->num_queues); fflush(stderr);
if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid,
&async_nif_worker_fn, (void*)wi, NULL) != 0) {
&async_nif_worker_fn, (void*)we, NULL) != 0) {
async_nif->shutdown = 1;
enif_cond_broadcast(async_nif->cnd);
for (j = 0; j < async_nif->num_queues; j++) {
struct async_nif_work_queue *q = &async_nif->queues[j];
enif_cond_broadcast(q->reqs_cnd);
enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd);
}
while(i-- > 0) {
void *exit_value = 0; /* Ignore this. */
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value);
}
bzero(async_nif->worker_entries, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
enif_cond_destroy(async_nif->cnd);
async_nif->cnd = NULL;
enif_mutex_destroy(async_nif->req_mutex);
async_nif->req_mutex = NULL;
return NULL;
}
}

View file

@ -30,17 +30,13 @@
#ifdef DEBUG
#include <stdio.h>
#include <stdarg.h>
void debugf(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
fprintf(stderr, "\r\n");
fflush(stderr);
va_end(ap);
}
#define dprint(s, ...) do { \
fprintf(stderr, s, ##__VA_ARGS__); \
fprintf(stderr, "\r\n"); \
fflush(stderr); \
} while(0);
#else
# define debugf(X, ...) {}
# define dprint(s, ...) {}
#endif
static ErlNifResourceType *wterl_conn_RESOURCE;
@ -282,7 +278,7 @@ ASYNC_NIF_DECL(
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
//debugf("c: %d // %s\ns: %d // %s", config.size, (char *)config.data, (char *)session_config.data, session_config.size);
//dprint("c: %d // %s\ns: %d // %s", config.size, (char *)config.data, (char *)session_config.data, session_config.size);
int rc = wiredtiger_open(args->homedir, NULL, config.data[0] != 0 ? (const char*)config.data : NULL, &conn);
if (rc == 0) {
WterlConnHandle *conn_handle = enif_alloc_resource(wterl_conn_RESOURCE, sizeof(WterlConnHandle));
@ -1203,7 +1199,7 @@ ASYNC_NIF_DECL(
/* We create a separate session here to ensure that operations are thread safe. */
WT_CONNECTION *conn = args->conn_handle->conn;
WT_SESSION *session = NULL;
//debugf("cursor open: %s", (char *)args->conn_handle->session_config);
//dprint("cursor open: %s", (char *)args->conn_handle->session_config);
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
@ -1823,36 +1819,36 @@ on_upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data, ERL_NIF_TERM
static ErlNifFunc nif_funcs[] =
{
{"checkpoint_nif", 3, wterl_checkpoint},
{"conn_close_nif", 2, wterl_conn_close},
{"conn_open_nif", 4, wterl_conn_open},
{"create_nif", 4, wterl_create},
{"delete_nif", 4, wterl_delete},
{"drop_nif", 4, wterl_drop},
{"get_nif", 4, wterl_get},
{"put_nif", 5, wterl_put},
{"rename_nif", 5, wterl_rename},
{"salvage_nif", 4, wterl_salvage},
// TODO: {"txn_begin", 3, wterl_txn_begin},
// TODO: {"txn_commit", 3, wterl_txn_commit},
// TODO: {"txn_abort", 3, wterl_txn_abort},
{"truncate_nif", 6, wterl_truncate},
{"upgrade_nif", 4, wterl_upgrade},
{"verify_nif", 4, wterl_verify},
{"cursor_close_nif", 2, wterl_cursor_close},
{"cursor_insert_nif", 4, wterl_cursor_insert},
{"cursor_next_key_nif", 2, wterl_cursor_next_key},
{"cursor_next_nif", 2, wterl_cursor_next},
{"cursor_next_value_nif", 2, wterl_cursor_next_value},
{"cursor_open_nif", 4, wterl_cursor_open},
{"cursor_prev_key_nif", 2, wterl_cursor_prev_key},
{"cursor_prev_nif", 2, wterl_cursor_prev},
{"cursor_prev_value_nif", 2, wterl_cursor_prev_value},
{"cursor_remove_nif", 3, wterl_cursor_remove},
{"cursor_reset_nif", 2, wterl_cursor_reset},
{"cursor_search_near_nif", 3, wterl_cursor_search_near},
{"cursor_search_nif", 3, wterl_cursor_search},
{"cursor_update_nif", 4, wterl_cursor_update},
{"checkpoint_nif", 4, wterl_checkpoint},
{"conn_close_nif", 3, wterl_conn_close},
{"conn_open_nif", 5, wterl_conn_open},
{"create_nif", 5, wterl_create},
{"delete_nif", 5, wterl_delete},
{"drop_nif", 5, wterl_drop},
{"get_nif", 5, wterl_get},
{"put_nif", 6, wterl_put},
{"rename_nif", 6, wterl_rename},
{"salvage_nif", 5, wterl_salvage},
// TODO: {"txn_begin", 4, wterl_txn_begin},
// TODO: {"txn_commit", 4, wterl_txn_commit},
// TODO: {"txn_abort", 4, wterl_txn_abort},
{"truncate_nif", 7, wterl_truncate},
{"upgrade_nif", 5, wterl_upgrade},
{"verify_nif", 5, wterl_verify},
{"cursor_close_nif", 3, wterl_cursor_close},
{"cursor_insert_nif", 5, wterl_cursor_insert},
{"cursor_next_key_nif", 3, wterl_cursor_next_key},
{"cursor_next_nif", 3, wterl_cursor_next},
{"cursor_next_value_nif", 3, wterl_cursor_next_value},
{"cursor_open_nif", 5, wterl_cursor_open},
{"cursor_prev_key_nif", 3, wterl_cursor_prev_key},
{"cursor_prev_nif", 3, wterl_cursor_prev},
{"cursor_prev_value_nif", 3, wterl_cursor_prev_value},
{"cursor_remove_nif", 4, wterl_cursor_remove},
{"cursor_reset_nif", 3, wterl_cursor_reset},
{"cursor_search_near_nif", 4, wterl_cursor_search_near},
{"cursor_search_nif", 4, wterl_cursor_search},
{"cursor_update_nif", 5, wterl_cursor_update},
};
ERL_NIF_INIT(wterl, nif_funcs, &on_load, &on_reload, &on_upgrade, &on_unload);

View file

@ -24,7 +24,8 @@
-define(ASYNC_NIF_CALL(Fun, Args),
begin
NIFRef = erlang:make_ref(),
case erlang:apply(Fun, [NIFRef|Args]) of
Id = erlang:system_info(scheduler_id) - 1,
case erlang:apply(Fun, [NIFRef|[Id|Args]]) of
{ok, {enqueued, _QDepth}} ->
receive
{NIFRef, {error, shutdown}=Error} ->

View file

@ -321,13 +321,14 @@ callback(_Ref, _Msg, State) ->
%% ===================================================================
%% @private
max_sessions(Config) ->
RingSize =
case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of
undefined -> 1024;
Size -> Size
end,
2 * (RingSize * erlang:system_info(schedulers)).
max_sessions(_Config) -> % TODO:
8192.
%% RingSize =
%% case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of
%% undefined -> 1024;
%% Size -> Size
%% end,
%% 2 * (RingSize * erlang:system_info(schedulers)).
%% @private
establish_utility_cursors(Connection, Table) ->

View file

@ -115,20 +115,20 @@ connection_open(HomeDir, ConnectionConfig, SessionConfig) ->
-spec conn_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}.
conn_open(HomeDir, ConnectionConfig, SessionConfig) ->
?ASYNC_NIF_CALL(fun conn_open_nif/4, [HomeDir,
?ASYNC_NIF_CALL(fun conn_open_nif/5, [HomeDir,
config_to_bin(ConnectionConfig),
config_to_bin(SessionConfig)]).
-spec conn_open_nif(reference(), string(), config(), config()) -> {ok, connection()} | {error, term()}.
conn_open_nif(_AsyncRef, _HomeDir, _ConnectionConfig, _SessionConfig) ->
-spec conn_open_nif(reference(), non_neg_integer(), string(), config(), config()) -> {ok, connection()} | {error, term()}.
conn_open_nif(_AsyncRef, _SchedulerId, _HomeDir, _ConnectionConfig, _SessionConfig) ->
?nif_stub.
-spec connection_close(connection()) -> ok | {error, term()}.
connection_close(ConnRef) ->
?ASYNC_NIF_CALL(fun conn_close_nif/2, [ConnRef]).
?ASYNC_NIF_CALL(fun conn_close_nif/3, [ConnRef]).
-spec conn_close_nif(reference(), connection()) -> ok | {error, term()}.
conn_close_nif(_AsyncRef, _ConnRef) ->
-spec conn_close_nif(reference(), non_neg_integer(), connection()) -> ok | {error, term()}.
conn_close_nif(_AsyncRef, _SchedulerId, _ConnRef) ->
?nif_stub.
-spec create(connection(), string()) -> ok | {error, term()}.
@ -136,10 +136,10 @@ conn_close_nif(_AsyncRef, _ConnRef) ->
create(Ref, Name) ->
create(Ref, Name, []).
create(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun create_nif/4, [Ref, Name, config_to_bin(Config)]).
?ASYNC_NIF_CALL(fun create_nif/5, [Ref, Name, config_to_bin(Config)]).
-spec create_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
create_nif(_AsyncNif, _Ref, _Name, _Config) ->
-spec create_nif(reference(), non_neg_integer(), connection(), string(), config()) -> ok | {error, term()}.
create_nif(_AsyncNif, _SchedulerId, _Ref, _Name, _Config) ->
?nif_stub.
-spec drop(connection(), string()) -> ok | {error, term()}.
@ -147,34 +147,34 @@ create_nif(_AsyncNif, _Ref, _Name, _Config) ->
drop(Ref, Name) ->
drop(Ref, Name, []).
drop(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun drop_nif/4, [Ref, Name, config_to_bin(Config)]).
?ASYNC_NIF_CALL(fun drop_nif/5, [Ref, Name, config_to_bin(Config)]).
-spec drop_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
drop_nif(_AsyncRef, _Ref, _Name, _Config) ->
-spec drop_nif(reference(), non_neg_integer(), connection(), string(), config()) -> ok | {error, term()}.
drop_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) ->
?nif_stub.
-spec delete(connection(), string(), key()) -> ok | {error, term()}.
delete(Ref, Table, Key) ->
?ASYNC_NIF_CALL(fun delete_nif/4, [Ref, Table, Key]).
?ASYNC_NIF_CALL(fun delete_nif/5, [Ref, Table, Key]).
-spec delete_nif(reference(), connection(), string(), key()) -> ok | {error, term()}.
delete_nif(_AsyncRef, _Ref, _Table, _Key) ->
-spec delete_nif(reference(), non_neg_integer(), connection(), string(), key()) -> ok | {error, term()}.
delete_nif(_AsyncRef, _SchedulerId, _Ref, _Table, _Key) ->
?nif_stub.
-spec get(connection(), string(), key()) -> {ok, value()} | not_found | {error, term()}.
get(Ref, Table, Key) ->
?ASYNC_NIF_CALL(fun get_nif/4, [Ref, Table, Key]).
?ASYNC_NIF_CALL(fun get_nif/5, [Ref, Table, Key]).
-spec get_nif(reference(), connection(), string(), key()) -> {ok, value()} | not_found | {error, term()}.
get_nif(_AsyncRef, _Ref, _Table, _Key) ->
-spec get_nif(reference(), non_neg_integer(), connection(), string(), key()) -> {ok, value()} | not_found | {error, term()}.
get_nif(_AsyncRef, _SchedulerId, _Ref, _Table, _Key) ->
?nif_stub.
-spec put(connection(), string(), key(), value()) -> ok | {error, term()}.
put(Ref, Table, Key, Value) ->
?ASYNC_NIF_CALL(fun put_nif/5, [Ref, Table, Key, Value]).
?ASYNC_NIF_CALL(fun put_nif/6, [Ref, Table, Key, Value]).
-spec put_nif(reference(), connection(), string(), key(), value()) -> ok | {error, term()}.
put_nif(_AsyncRef, _Ref, _Table, _Key, _Value) ->
-spec put_nif(reference(), non_neg_integer(), connection(), string(), key(), value()) -> ok | {error, term()}.
put_nif(_AsyncRef, _SchedulerId, _Ref, _Table, _Key, _Value) ->
?nif_stub.
-spec rename(connection(), string(), string()) -> ok | {error, term()}.
@ -182,10 +182,10 @@ put_nif(_AsyncRef, _Ref, _Table, _Key, _Value) ->
rename(Ref, OldName, NewName) ->
rename(Ref, OldName, NewName, []).
rename(Ref, OldName, NewName, Config) ->
?ASYNC_NIF_CALL(fun rename_nif/5, [Ref, OldName, NewName, config_to_bin(Config)]).
?ASYNC_NIF_CALL(fun rename_nif/6, [Ref, OldName, NewName, config_to_bin(Config)]).
-spec rename_nif(reference(), connection(), string(), string(), config()) -> ok | {error, term()}.
rename_nif(_AsyncRef, _Ref, _OldName, _NewName, _Config) ->
-spec rename_nif(reference(), non_neg_integer(), connection(), string(), string(), config()) -> ok | {error, term()}.
rename_nif(_AsyncRef, _SchedulerId, _Ref, _OldName, _NewName, _Config) ->
?nif_stub.
-spec salvage(connection(), string()) -> ok | {error, term()}.
@ -193,10 +193,10 @@ rename_nif(_AsyncRef, _Ref, _OldName, _NewName, _Config) ->
salvage(Ref, Name) ->
salvage(Ref, Name, []).
salvage(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun salvage_nif/4, [Ref, Name, config_to_bin(Config)]).
?ASYNC_NIF_CALL(fun salvage_nif/5, [Ref, Name, config_to_bin(Config)]).
-spec salvage_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
salvage_nif(_AsyncRef, _Ref, _Name, _Config) ->
-spec salvage_nif(reference(), non_neg_integer(), connection(), string(), config()) -> ok | {error, term()}.
salvage_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) ->
?nif_stub.
-spec checkpoint(connection()) -> ok | {error, term()}.
@ -204,10 +204,10 @@ salvage_nif(_AsyncRef, _Ref, _Name, _Config) ->
checkpoint(_Ref) ->
checkpoint(_Ref, []).
checkpoint(Ref, Config) ->
?ASYNC_NIF_CALL(fun checkpoint_nif/3, [Ref, config_to_bin(Config)]).
?ASYNC_NIF_CALL(fun checkpoint_nif/4, [Ref, config_to_bin(Config)]).
-spec checkpoint_nif(reference(), connection(), config()) -> ok | {error, term()}.
checkpoint_nif(_AsyncRef, _Ref, _Config) ->
-spec checkpoint_nif(reference(), non_neg_integer(), connection(), config()) -> ok | {error, term()}.
checkpoint_nif(_AsyncRef, _SchedulerId, _Ref, _Config) ->
?nif_stub.
-spec truncate(connection(), string()) -> ok | {error, term()}.
@ -221,10 +221,10 @@ truncate(Ref, Name, Config) ->
truncate(Ref, Name, Start, Stop) ->
truncate(Ref, Name, Start, Stop, []).
truncate(Ref, Name, Start, Stop, Config) ->
?ASYNC_NIF_CALL(fun truncate_nif/6, [Ref, Name, Start, Stop, config_to_bin(Config)]).
?ASYNC_NIF_CALL(fun truncate_nif/7, [Ref, Name, Start, Stop, config_to_bin(Config)]).
-spec truncate_nif(reference(), connection(), string(), cursor() | first, cursor() | last, config()) -> ok | {error, term()}.
truncate_nif(_AsyncRef, _Ref, _Name, _Start, _Stop, _Config) ->
-spec truncate_nif(reference(), non_neg_integer(), connection(), string(), cursor() | first, cursor() | last, config()) -> ok | {error, term()}.
truncate_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Start, _Stop, _Config) ->
?nif_stub.
-spec upgrade(connection(), string()) -> ok | {error, term()}.
@ -232,10 +232,10 @@ truncate_nif(_AsyncRef, _Ref, _Name, _Start, _Stop, _Config) ->
upgrade(Ref, Name) ->
upgrade(Ref, Name, []).
upgrade(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun upgrade_nif/4, [Ref, Name, config_to_bin(Config)]).
?ASYNC_NIF_CALL(fun upgrade_nif/5, [Ref, Name, config_to_bin(Config)]).
-spec upgrade_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
upgrade_nif(_AsyncRef, _Ref, _Name, _Config) ->
-spec upgrade_nif(reference(), non_neg_integer(), connection(), string(), config()) -> ok | {error, term()}.
upgrade_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) ->
?nif_stub.
-spec verify(connection(), string()) -> ok | {error, term()}.
@ -243,10 +243,10 @@ upgrade_nif(_AsyncRef, _Ref, _Name, _Config) ->
verify(Ref, Name) ->
verify(Ref, Name, []).
verify(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun verify_nif/4, [Ref, Name, config_to_bin(Config)]).
?ASYNC_NIF_CALL(fun verify_nif/5, [Ref, Name, config_to_bin(Config)]).
-spec verify_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
verify_nif(_AsyncRef, _Ref, _Name, _Config) ->
-spec verify_nif(reference(), non_neg_integer(), connection(), string(), config()) -> ok | {error, term()}.
verify_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) ->
?nif_stub.
-spec cursor_open(connection(), string()) -> {ok, cursor()} | {error, term()}.
@ -254,114 +254,114 @@ verify_nif(_AsyncRef, _Ref, _Name, _Config) ->
cursor_open(Ref, Table) ->
cursor_open(Ref, Table, []).
cursor_open(Ref, Table, Config) ->
?ASYNC_NIF_CALL(fun cursor_open_nif/4, [Ref, Table, config_to_bin(Config)]).
?ASYNC_NIF_CALL(fun cursor_open_nif/5, [Ref, Table, config_to_bin(Config)]).
-spec cursor_open_nif(reference(), connection(), string(), config()) -> {ok, cursor()} | {error, term()}.
cursor_open_nif(_AsyncRef, _Ref, _Table, _Config) ->
-spec cursor_open_nif(reference(), non_neg_integer(), connection(), string(), config()) -> {ok, cursor()} | {error, term()}.
cursor_open_nif(_AsyncRef, _SchedulerId, _Ref, _Table, _Config) ->
?nif_stub.
-spec cursor_close(cursor()) -> ok | {error, term()}.
cursor_close(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_close_nif/2, [Cursor]).
?ASYNC_NIF_CALL(fun cursor_close_nif/3, [Cursor]).
-spec cursor_close_nif(reference(), cursor()) -> ok | {error, term()}.
cursor_close_nif(_AsyncRef, _Cursor) ->
-spec cursor_close_nif(reference(), non_neg_integer(), cursor()) -> ok | {error, term()}.
cursor_close_nif(_AsyncRef, _SchedulerId, _Cursor) ->
?nif_stub.
-spec cursor_next(cursor()) -> {ok, key(), value()} | not_found | {error, term()}.
cursor_next(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_next_nif/2, [Cursor]).
?ASYNC_NIF_CALL(fun cursor_next_nif/3, [Cursor]).
-spec cursor_next_nif(reference(), cursor()) -> {ok, key(), value()} | not_found | {error, term()}.
cursor_next_nif(_AsyncRef, _Cursor) ->
-spec cursor_next_nif(reference(), non_neg_integer(), cursor()) -> {ok, key(), value()} | not_found | {error, term()}.
cursor_next_nif(_AsyncRef, _SchedulerId, _Cursor) ->
?nif_stub.
-spec cursor_next_key(cursor()) -> {ok, key()} | not_found | {error, term()}.
cursor_next_key(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_next_key_nif/2, [Cursor]).
?ASYNC_NIF_CALL(fun cursor_next_key_nif/3, [Cursor]).
-spec cursor_next_key_nif(reference(), cursor()) -> {ok, key()} | not_found | {error, term()}.
cursor_next_key_nif(_AsyncRef, _Cursor) ->
-spec cursor_next_key_nif(reference(), non_neg_integer(), cursor()) -> {ok, key()} | not_found | {error, term()}.
cursor_next_key_nif(_AsyncRef, _SchedulerId, _Cursor) ->
?nif_stub.
-spec cursor_next_value(cursor()) -> {ok, value()} | not_found | {error, term()}.
cursor_next_value(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_next_value_nif/2, [Cursor]).
?ASYNC_NIF_CALL(fun cursor_next_value_nif/3, [Cursor]).
-spec cursor_next_value_nif(reference(), cursor()) -> {ok, value()} | not_found | {error, term()}.
cursor_next_value_nif(_AsyncRef, _Cursor) ->
-spec cursor_next_value_nif(reference(), non_neg_integer(), cursor()) -> {ok, value()} | not_found | {error, term()}.
cursor_next_value_nif(_AsyncRef, _SchedulerId, _Cursor) ->
?nif_stub.
-spec cursor_prev(cursor()) -> {ok, key(), value()} | not_found | {error, term()}.
cursor_prev(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_prev_nif/2, [Cursor]).
?ASYNC_NIF_CALL(fun cursor_prev_nif/3, [Cursor]).
-spec cursor_prev_nif(reference(), cursor()) -> {ok, key(), value()} | not_found | {error, term()}.
cursor_prev_nif(_AsyncRef, _Cursor) ->
-spec cursor_prev_nif(reference(), non_neg_integer(), cursor()) -> {ok, key(), value()} | not_found | {error, term()}.
cursor_prev_nif(_AsyncRef, _SchedulerId, _Cursor) ->
?nif_stub.
-spec cursor_prev_key(cursor()) -> {ok, key()} | not_found | {error, term()}.
cursor_prev_key(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_prev_key_nif/2, [Cursor]).
?ASYNC_NIF_CALL(fun cursor_prev_key_nif/3, [Cursor]).
-spec cursor_prev_key_nif(reference(), cursor()) -> {ok, key()} | not_found | {error, term()}.
cursor_prev_key_nif(_AsyncRef, _Cursor) ->
-spec cursor_prev_key_nif(reference(), non_neg_integer(), cursor()) -> {ok, key()} | not_found | {error, term()}.
cursor_prev_key_nif(_AsyncRef, _SchedulerId, _Cursor) ->
?nif_stub.
-spec cursor_prev_value(cursor()) -> {ok, value()} | not_found | {error, term()}.
cursor_prev_value(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_prev_value_nif/2, [Cursor]).
?ASYNC_NIF_CALL(fun cursor_prev_value_nif/3, [Cursor]).
-spec cursor_prev_value_nif(reference(), cursor()) -> {ok, value()} | not_found | {error, term()}.
cursor_prev_value_nif(_AsyncRef, _Cursor) ->
-spec cursor_prev_value_nif(reference(), non_neg_integer(), cursor()) -> {ok, value()} | not_found | {error, term()}.
cursor_prev_value_nif(_AsyncRef, _SchedulerId, _Cursor) ->
?nif_stub.
-spec cursor_search(cursor(), key()) -> {ok, value()} | {error, term()}.
cursor_search(Cursor, Key) ->
?ASYNC_NIF_CALL(fun cursor_search_nif/3, [Cursor, Key]).
?ASYNC_NIF_CALL(fun cursor_search_nif/4, [Cursor, Key]).
-spec cursor_search_nif(reference(), cursor(), key()) -> {ok, value()} | {error, term()}.
cursor_search_nif(_AsyncRef, _Cursor, _Key) ->
-spec cursor_search_nif(reference(), non_neg_integer(), cursor(), key()) -> {ok, value()} | {error, term()}.
cursor_search_nif(_AsyncRef, _SchedulerId, _Cursor, _Key) ->
?nif_stub.
-spec cursor_search_near(cursor(), key()) -> {ok, value()} | {error, term()}.
cursor_search_near(Cursor, Key) ->
?ASYNC_NIF_CALL(fun cursor_search_near_nif/3, [Cursor, Key]).
?ASYNC_NIF_CALL(fun cursor_search_near_nif/4, [Cursor, Key]).
-spec cursor_search_near_nif(reference(), cursor(), key()) -> {ok, value()} | {error, term()}.
cursor_search_near_nif(_AsyncRef, _Cursor, _Key) ->
-spec cursor_search_near_nif(reference(), non_neg_integer(), cursor(), key()) -> {ok, value()} | {error, term()}.
cursor_search_near_nif(_AsyncRef, _SchedulerId, _Cursor, _Key) ->
?nif_stub.
-spec cursor_reset(cursor()) -> ok | {error, term()}.
cursor_reset(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_reset_nif/2, [Cursor]).
?ASYNC_NIF_CALL(fun cursor_reset_nif/3, [Cursor]).
-spec cursor_reset_nif(reference(), cursor()) -> ok | {error, term()}.
cursor_reset_nif(_AsyncRef, _Cursor) ->
-spec cursor_reset_nif(reference(), non_neg_integer(), cursor()) -> ok | {error, term()}.
cursor_reset_nif(_AsyncRef, _SchedulerId, _Cursor) ->
?nif_stub.
-spec cursor_insert(cursor(), key(), value()) -> ok | {error, term()}.
cursor_insert(Cursor, Key, Value) ->
?ASYNC_NIF_CALL(fun cursor_insert_nif/4, [Cursor, Key, Value]).
?ASYNC_NIF_CALL(fun cursor_insert_nif/5, [Cursor, Key, Value]).
-spec cursor_insert_nif(reference(), cursor(), key(), value()) -> ok | {error, term()}.
cursor_insert_nif(_AsyncRef, _Cursor, _Key, _Value) ->
-spec cursor_insert_nif(reference(), non_neg_integer(), cursor(), key(), value()) -> ok | {error, term()}.
cursor_insert_nif(_AsyncRef, _SchedulerId, _Cursor, _Key, _Value) ->
?nif_stub.
-spec cursor_update(cursor(), key(), value()) -> ok | {error, term()}.
cursor_update(Cursor, Key, Value) ->
?ASYNC_NIF_CALL(fun cursor_update_nif/4, [Cursor, Key, Value]).
?ASYNC_NIF_CALL(fun cursor_update_nif/5, [Cursor, Key, Value]).
-spec cursor_update_nif(reference(), cursor(), key(), value()) -> ok | {error, term()}.
cursor_update_nif(_AsyncRef, _Cursor, _Key, _Value) ->
-spec cursor_update_nif(reference(), non_neg_integer(), cursor(), key(), value()) -> ok | {error, term()}.
cursor_update_nif(_AsyncRef, _SchedulerId, _Cursor, _Key, _Value) ->
?nif_stub.
-spec cursor_remove(cursor(), key()) -> ok | {error, term()}.
cursor_remove(Cursor, Key) ->
?ASYNC_NIF_CALL(fun cursor_remove_nif/3, [Cursor, Key]).
?ASYNC_NIF_CALL(fun cursor_remove_nif/4, [Cursor, Key]).
-spec cursor_remove_nif(reference(), cursor(), key()) -> ok | {error, term()}.
cursor_remove_nif(_AsyncRef, _Cursor, _Key) ->
-spec cursor_remove_nif(reference(), non_neg_integer(), cursor(), key()) -> ok | {error, term()}.
cursor_remove_nif(_AsyncRef, _SchedulerId, _Cursor, _Key) ->
?nif_stub.
-type fold_keys_fun() :: fun((Key::binary(), any()) -> any()).