We don't use the scheduler id from Erlang anymore in async_nif so

stop sending it over.  Allow the user to set a "type" of storage
in their config to either 'table' for btree or 'lsm' for a log
structured merge tree.  Various other cleanup.
This commit is contained in:
Gregory Burd 2013-04-15 00:08:01 -04:00
parent 9ed2137730
commit 5ba491adfa
5 changed files with 201 additions and 196 deletions

View file

@ -33,6 +33,7 @@ extern "C" {
#endif
#define ASYNC_NIF_MAX_WORKERS 128
#define ASYNC_NIF_WORKER_QUEUE_SIZE 1024
struct async_nif_req_entry {
ERL_NIF_TERM ref, *argv;
@ -48,10 +49,8 @@ struct async_nif_work_queue {
ErlNifMutex *reqs_mutex;
ErlNifCond *reqs_cnd;
unsigned int depth;
#ifdef ASYNC_NIF_STATS
struct async_stats stats;
#endif
STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
// TODO: struct async_nif_req_entry items[ASYNC_NIF_WORKER_QUEUE_SIZE];
};
struct async_nif_worker_entry {
@ -81,12 +80,10 @@ struct async_nif_state {
struct decl ## _args *args = &on_stack_args; \
struct decl ## _args *copy_of_args; \
struct async_nif_req_entry *req = NULL; \
int scheduler_id = 0; \
ErlNifEnv *new_env = NULL; \
/* argv[0] is a ref used for selective recv */ \
/* argv[1] is the current Erlang (scheduler_id - 1) */ \
const ERL_NIF_TERM *argv = argv_in + 2; \
argc -= 2; \
const ERL_NIF_TERM *argv = argv_in + 1; \
argc -= 1; \
struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env); \
if (async_nif->shutdown) \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
@ -118,8 +115,7 @@ struct async_nif_state {
req->args = (void*)copy_of_args; \
req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \
req->fn_post = (void (*)(void *))fn_post_ ## decl; \
enif_get_int(env, argv_in[1], &scheduler_id); \
return async_nif_enqueue_req(async_nif, req, scheduler_id); \
return async_nif_enqueue_req(async_nif, req); \
}
#define ASYNC_NIF_INIT(name) \
@ -155,7 +151,7 @@ struct async_nif_state {
#define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg))
static ERL_NIF_TERM
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int scheduler_id)
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req)
{
/* If we're shutting down return an error term and ignore the request. */
if (async_nif->shutdown) {
@ -163,27 +159,26 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
enif_make_atom(req->env, "shutdown"));
}
/* We manage one request queue per-scheduler thread running in the Erlang VM.
Each request is placed onto the queue based on which schdeuler thread
was processing the request. Work queues are balanced only if requests
arrive from a sufficiently random distribution of Erlang scheduler
threads. */
unsigned int qid = async_nif->next_q; // Keep a local to avoid the race.
struct async_nif_work_queue *q = &async_nif->queues[qid];
if (q->depth > 10)
async_nif->next_q = (qid + 1) % async_nif->num_queues;
while (q->depth == ASYNC_NIF_WORKER_QUEUE_SIZE) {
qid = (qid + 1) % async_nif->num_queues;
q = &async_nif->queues[qid];
}
/* TODO:
if (q->avg_latency > 5) {
async_nif->next_q = (qid + 1) % async_nif->num_queues;
}
*/
/* Otherwise, add the request to the work queue. */
enif_mutex_lock(q->reqs_mutex);
STAILQ_INSERT_TAIL(&q->reqs, req, entries);
q->depth++;
//fprintf(stderr, "enqueued %d (%d)\r\n", qid, async_nif->req_count); fflush(stderr);
/* Build the term before releasing the lock so as not to race on the use of
the req pointer. */
the req pointer (which will soon become invalid). */
ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"),
enif_make_tuple2(req->env, enif_make_atom(req->env, "enqueued"),
enif_make_int(req->env, q->depth)));
enif_make_atom(req->env, "enqueued"));
enif_mutex_unlock(q->reqs_mutex);
enif_cond_signal(q->reqs_cnd);
return reply;
@ -212,15 +207,18 @@ async_nif_worker_fn(void *arg)
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work;
} else {
/* At this point, `req` is ours to execute and we hold the reqs_mutex lock. */
/* At this point the next req is ours to process and we hold the
reqs_mutex lock. */
do {
/* Take the request off the queue. */
//fprintf(stderr, "worker %d queue %d performing req (%d)\r\n", worker_id, (worker_id % async_nif->num_queues), async_nif->req_count); fflush(stderr);
STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries);
q->depth--;
enif_mutex_unlock(q->reqs_mutex);
/* Wake up another thread working on this queue. */
enif_cond_signal(q->reqs_cnd);
/* Finally, do the work. */
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
req->fn_post(req->args);
@ -233,7 +231,6 @@ async_nif_worker_fn(void *arg)
if (STAILQ_EMPTY(&q->reqs)) {
req = NULL;
} else {
enif_cond_signal(q->reqs_cnd);
enif_mutex_lock(q->reqs_mutex);
req = STAILQ_FIRST(&q->reqs);
}

View file

@ -70,8 +70,8 @@ typedef struct {
typedef struct {
WT_CONNECTION *conn;
const char *session_config;
ErlNifMutex *context_mutex;
unsigned int num_contexts;
ErlNifMutex *contexts_mutex;
WterlCtx contexts[ASYNC_NIF_MAX_WORKERS];
} WterlConnHandle;
@ -102,17 +102,17 @@ __session_for(WterlConnHandle *conn_handle, unsigned int worker_id, WT_SESSION *
*session = ctx->session;
if (*session == NULL) {
/* Create a context for this worker thread to reuse. */
enif_mutex_lock(conn_handle->context_mutex);
enif_mutex_lock(conn_handle->contexts_mutex);
WT_CONNECTION *conn = conn_handle->conn;
int rc = conn->open_session(conn, NULL, conn_handle->session_config, session);
if (rc != 0) {
enif_mutex_unlock(conn_handle->context_mutex);
enif_mutex_unlock(conn_handle->contexts_mutex);
return rc;
}
ctx->session = *session;
ctx->cursors = kh_init(cursors);
conn_handle->num_contexts++;
enif_mutex_unlock(conn_handle->context_mutex);
enif_mutex_unlock(conn_handle->contexts_mutex);
}
return 0;
}
@ -120,7 +120,7 @@ __session_for(WterlConnHandle *conn_handle, unsigned int worker_id, WT_SESSION *
/**
* Close all sessions and all cursors open on any objects.
*
* Note: always call within enif_mutex_lock/unlock(conn_handle->context_mutex)
* Note: always call within enif_mutex_lock/unlock(conn_handle->contexts_mutex)
*/
void
__close_all_sessions(WterlConnHandle *conn_handle)
@ -142,7 +142,7 @@ __close_all_sessions(WterlConnHandle *conn_handle)
}
kh_destroy(cursors, h);
session->close(session, NULL);
memset(&conn_handle->contexts[i], 0, sizeof(WterlCtx));
ctx->session = NULL;
}
}
conn_handle->num_contexts = 0;
@ -151,7 +151,7 @@ __close_all_sessions(WterlConnHandle *conn_handle)
/**
* Close cursors open on 'uri' object.
*
* Note: always call within enif_mutex_lock/unlock(conn_handle->context_mutex)
* Note: always call within enif_mutex_lock/unlock(conn_handle->contexts_mutex)
*/
void
__close_cursors_on(WterlConnHandle *conn_handle, const char *uri) // TODO: race?
@ -189,18 +189,18 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char
*cursor = (WT_CURSOR*)kh_value(h, itr);
} else {
// key does not exist in hash table, create and insert one
enif_mutex_lock(conn_handle->context_mutex);
enif_mutex_lock(conn_handle->contexts_mutex);
WT_SESSION *session = conn_handle->contexts[worker_id].session;
int rc = session->open_cursor(session, uri, NULL, "overwrite,raw", cursor);
if (rc != 0) {
enif_mutex_unlock(conn_handle->context_mutex);
enif_mutex_unlock(conn_handle->contexts_mutex);
return rc;
}
char *key = enif_alloc(sizeof(Uri));
if (!key) {
session->close(session, NULL);
enif_mutex_unlock(conn_handle->context_mutex);
enif_mutex_unlock(conn_handle->contexts_mutex);
return ENOMEM;
}
memcpy(key, uri, 128);
@ -208,7 +208,7 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char
int itr_status;
itr = kh_put(cursors, h, key, &itr_status);
kh_value(h, itr) = *cursor;
enif_mutex_unlock(conn_handle->context_mutex);
enif_mutex_unlock(conn_handle->contexts_mutex);
}
return 0;
}
@ -302,7 +302,7 @@ ASYNC_NIF_DECL(
conn_handle->conn = conn;
conn_handle->num_contexts = 0;
memset(conn_handle->contexts, 0, sizeof(WterlCtx) * ASYNC_NIF_MAX_WORKERS);
conn_handle->context_mutex = enif_mutex_create(NULL);
conn_handle->contexts_mutex = enif_mutex_create(NULL);
ERL_NIF_TERM result = enif_make_resource(env, conn_handle);
enif_release_resource(conn_handle);
ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result));
@ -338,7 +338,7 @@ ASYNC_NIF_DECL(
{ // work
/* Free up the shared sessions and cursors. */
enif_mutex_lock(args->conn_handle->context_mutex);
enif_mutex_lock(args->conn_handle->contexts_mutex);
__close_all_sessions(args->conn_handle);
if (args->conn_handle->session_config) {
enif_free((char *)args->conn_handle->session_config);
@ -346,8 +346,8 @@ ASYNC_NIF_DECL(
}
WT_CONNECTION* conn = args->conn_handle->conn;
int rc = conn->close(conn, NULL);
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_destroy(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
enif_mutex_destroy(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
},
{ // post
@ -441,13 +441,13 @@ ASYNC_NIF_DECL(
{ // work
/* This call requires that there be no open cursors referencing the object. */
enif_mutex_lock(args->conn_handle->context_mutex);
enif_mutex_lock(args->conn_handle->contexts_mutex);
__close_cursors_on(args->conn_handle, args->uri);
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
return;
}
@ -459,7 +459,7 @@ ASYNC_NIF_DECL(
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
return;
}
/* Note: we must first close all cursors referencing this object or this
@ -472,7 +472,7 @@ ASYNC_NIF_DECL(
rc = session->drop(session, args->uri, (const char*)config.data);
(void)session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
},
{ // post
@ -512,7 +512,7 @@ ASYNC_NIF_DECL(
{ // work
/* This call requires that there be no open cursors referencing the object. */
enif_mutex_lock(args->conn_handle->context_mutex);
enif_mutex_lock(args->conn_handle->contexts_mutex);
__close_cursors_on(args->conn_handle, args->oldname);
ErlNifBinary config;
@ -537,7 +537,7 @@ ASYNC_NIF_DECL(
// TODO: see drop's note, same goes here.
rc = session->rename(session, args->oldname, args->newname, (const char*)config.data);
(void)session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
},
{ // post
@ -577,7 +577,7 @@ ASYNC_NIF_DECL(
{ // work
/* This call requires that there be no open cursors referencing the object. */
enif_mutex_lock(args->conn_handle->context_mutex);
enif_mutex_lock(args->conn_handle->contexts_mutex);
__close_cursors_on(args->conn_handle, args->uri);
ErlNifBinary config;
@ -599,7 +599,7 @@ ASYNC_NIF_DECL(
rc = session->salvage(session, args->uri, (const char*)config.data);
(void)session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
},
{ // post
@ -706,13 +706,13 @@ ASYNC_NIF_DECL(
{ // work
/* This call requires that there be no open cursors referencing the object. */
enif_mutex_lock(args->conn_handle->context_mutex);
enif_mutex_lock(args->conn_handle->contexts_mutex);
__close_cursors_on(args->conn_handle, args->uri);
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
return;
}
@ -725,7 +725,7 @@ ASYNC_NIF_DECL(
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
return;
}
@ -740,7 +740,7 @@ ASYNC_NIF_DECL(
if (!args->from_first) {
if (!enif_inspect_binary(env, args->start, &start_key)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
return;
}
}
@ -748,7 +748,7 @@ ASYNC_NIF_DECL(
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
return;
}
/* Position the start cursor at the first record or the specified record. */
@ -758,7 +758,7 @@ ASYNC_NIF_DECL(
ASYNC_NIF_REPLY(__strerror_term(env, rc));
start->close(start);
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
return;
}
} else {
@ -771,7 +771,7 @@ ASYNC_NIF_DECL(
ASYNC_NIF_REPLY(__strerror_term(env, rc));
start->close(start);
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
return;
}
}
@ -779,7 +779,7 @@ ASYNC_NIF_DECL(
if (!args->to_last) {
if (!enif_inspect_binary(env, args->stop, &stop_key)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
return;
}
}
@ -788,7 +788,7 @@ ASYNC_NIF_DECL(
ASYNC_NIF_REPLY(__strerror_term(env, rc));
start->close(start);
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
return;
}
/* Position the stop cursor at the last record or the specified record. */
@ -799,7 +799,7 @@ ASYNC_NIF_DECL(
start->close(start);
stop->close(stop);
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
return;
}
} else {
@ -813,7 +813,7 @@ ASYNC_NIF_DECL(
start->close(start);
stop->close(stop);
session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
return;
}
}
@ -825,7 +825,7 @@ ASYNC_NIF_DECL(
if (start) start->close(start);
if (stop) stop->close(stop);
if (session) session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
},
{ // post
@ -862,13 +862,13 @@ ASYNC_NIF_DECL(
{ // work
/* This call requires that there be no open cursors referencing the object. */
enif_mutex_lock(args->conn_handle->context_mutex);
enif_mutex_lock(args->conn_handle->contexts_mutex);
__close_cursors_on(args->conn_handle, args->uri);
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
return;
}
@ -880,13 +880,13 @@ ASYNC_NIF_DECL(
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
return;
}
rc = session->upgrade(session, args->uri, (const char*)config.data);
(void)session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
},
{ // post
@ -924,13 +924,13 @@ ASYNC_NIF_DECL(
{ // work
/* This call requires that there be no open cursors referencing the object. */
enif_mutex_lock(args->conn_handle->context_mutex);
enif_mutex_lock(args->conn_handle->contexts_mutex);
__close_all_sessions(args->conn_handle);
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
return;
}
@ -942,13 +942,13 @@ ASYNC_NIF_DECL(
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
return;
}
rc = session->verify(session, args->uri, (const char*)config.data);
(void)session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->context_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
},
{ // post
@ -1823,36 +1823,36 @@ on_upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data, ERL_NIF_TERM
static ErlNifFunc nif_funcs[] =
{
{"checkpoint_nif", 4, wterl_checkpoint},
{"conn_close_nif", 3, wterl_conn_close},
{"conn_open_nif", 5, wterl_conn_open},
{"create_nif", 5, wterl_create},
{"delete_nif", 5, wterl_delete},
{"drop_nif", 5, wterl_drop},
{"get_nif", 5, wterl_get},
{"put_nif", 6, wterl_put},
{"rename_nif", 6, wterl_rename},
{"salvage_nif", 5, wterl_salvage},
// TODO: {"txn_begin", 4, wterl_txn_begin},
// TODO: {"txn_commit", 4, wterl_txn_commit},
// TODO: {"txn_abort", 4, wterl_txn_abort},
{"truncate_nif", 7, wterl_truncate},
{"upgrade_nif", 5, wterl_upgrade},
{"verify_nif", 5, wterl_verify},
{"cursor_close_nif", 3, wterl_cursor_close},
{"cursor_insert_nif", 5, wterl_cursor_insert},
{"cursor_next_key_nif", 3, wterl_cursor_next_key},
{"cursor_next_nif", 3, wterl_cursor_next},
{"cursor_next_value_nif", 3, wterl_cursor_next_value},
{"cursor_open_nif", 5, wterl_cursor_open},
{"cursor_prev_key_nif", 3, wterl_cursor_prev_key},
{"cursor_prev_nif", 3, wterl_cursor_prev},
{"cursor_prev_value_nif", 3, wterl_cursor_prev_value},
{"cursor_remove_nif", 4, wterl_cursor_remove},
{"cursor_reset_nif", 3, wterl_cursor_reset},
{"cursor_search_near_nif", 4, wterl_cursor_search_near},
{"cursor_search_nif", 4, wterl_cursor_search},
{"cursor_update_nif", 5, wterl_cursor_update},
{"checkpoint_nif", 3, wterl_checkpoint},
{"conn_close_nif", 2, wterl_conn_close},
{"conn_open_nif", 4, wterl_conn_open},
{"create_nif", 4, wterl_create},
{"delete_nif", 4, wterl_delete},
{"drop_nif", 4, wterl_drop},
{"get_nif", 4, wterl_get},
{"put_nif", 5, wterl_put},
{"rename_nif", 5, wterl_rename},
{"salvage_nif", 4, wterl_salvage},
// TODO: {"txn_begin", 3, wterl_txn_begin},
// TODO: {"txn_commit", 3, wterl_txn_commit},
// TODO: {"txn_abort", 3, wterl_txn_abort},
{"truncate_nif", 6, wterl_truncate},
{"upgrade_nif", 4, wterl_upgrade},
{"verify_nif", 4, wterl_verify},
{"cursor_close_nif", 2, wterl_cursor_close},
{"cursor_insert_nif", 4, wterl_cursor_insert},
{"cursor_next_key_nif", 2, wterl_cursor_next_key},
{"cursor_next_nif", 2, wterl_cursor_next},
{"cursor_next_value_nif", 2, wterl_cursor_next_value},
{"cursor_open_nif", 4, wterl_cursor_open},
{"cursor_prev_key_nif", 2, wterl_cursor_prev_key},
{"cursor_prev_nif", 2, wterl_cursor_prev},
{"cursor_prev_value_nif", 2, wterl_cursor_prev_value},
{"cursor_remove_nif", 3, wterl_cursor_remove},
{"cursor_reset_nif", 2, wterl_cursor_reset},
{"cursor_search_near_nif", 3, wterl_cursor_search_near},
{"cursor_search_nif", 3, wterl_cursor_search},
{"cursor_update_nif", 4, wterl_cursor_update},
};
ERL_NIF_INIT(wterl, nif_funcs, &on_load, &on_reload, &on_upgrade, &on_unload);

View file

@ -24,9 +24,8 @@
-define(ASYNC_NIF_CALL(Fun, Args),
begin
NIFRef = erlang:make_ref(),
Id = erlang:system_info(scheduler_id) - 1,
case erlang:apply(Fun, [NIFRef|[Id|Args]]) of
{ok, {enqueued, _QDepth}} ->
case erlang:apply(Fun, [NIFRef|Args]) of
{ok, enqueued} ->
receive
{NIFRef, {error, shutdown}=Error} ->
%% Work unit was queued, but not executed.

View file

@ -51,6 +51,7 @@
-define(CAPABILITIES, [async_fold]).
-record(state, {table :: string(),
type :: string(),
connection :: wterl:connection(),
is_empty_cursor :: wterl:cursor(),
status_cursor :: wterl:cursor()}).
@ -94,23 +95,31 @@ start(Partition, Config) ->
case AppStart of
ok ->
{ok, Connection} = establish_connection(Config),
Table = "lsm:wt" ++ integer_to_list(Partition),
Type = wterl:config_value(type, Config, "lsm"),
Table = Type ++ ":wt" ++ integer_to_list(Partition),
TableOpts =
[{block_compressor, "snappy"},
{internal_page_max, "128K"},
{leaf_page_max, "128K"},
{lsm_chunk_size, "25MB"},
{lsm_bloom_newest, true},
{lsm_bloom_oldest, true} ,
{lsm_bloom_bit_count, 128},
{lsm_bloom_hash_count, 64},
{lsm_bloom_config, [{leaf_page_max, "8MB"}]}
],
case Type of
"lsm" ->
[{block_compressor, "snappy"},
{internal_page_max, "128K"},
{leaf_page_max, "128K"},
{lsm_chunk_size, "25MB"},
{lsm_bloom_newest, true},
{lsm_bloom_oldest, true} ,
{lsm_bloom_bit_count, 128},
{lsm_bloom_hash_count, 64},
{lsm_bloom_config, [{leaf_page_max, "8MB"}]} ];
"table" ->
[{block_compressor, "snappy"}];
_ ->
[]
end,
case wterl:create(Connection, Table, TableOpts) of
ok ->
case establish_utility_cursors(Connection, Table) of
{ok, IsEmptyCursor, StatusCursor} ->
{ok, #state{table=Table, connection=Connection,
{ok, #state{table=Table, type=Type,
connection=Connection,
is_empty_cursor=IsEmptyCursor,
status_cursor=StatusCursor}};
{error, Reason2} ->
@ -345,7 +354,7 @@ establish_utility_cursors(Connection, Table) ->
end.
%% @private
establish_connection(Config) ->
establish_connection(Config, Type) ->
%% Get the data root directory
case app_helper:get_prop_or_env(data_root, Config, wterl) of
undefined ->
@ -366,7 +375,7 @@ establish_connection(Config) ->
wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)),
wterl:config_value(statistics_log, Config, [{wait, 30}]), % sec
%% NOTE: LSM auto-checkpoints, so we don't have too.
%% wterl:config_value(checkpoint, Config, [{wait, 10}]), % sec
[wterl:config_value(checkpoint, Config, [{wait, 10}]) || Type =:= "table"],
wterl:config_value(verbose, Config, [
%"ckpt" "block", "shared_cache", "evictserver", "fileops",
%"hazard", "mutex", "read", "readserver", "reconcile",

View file

@ -115,20 +115,20 @@ connection_open(HomeDir, ConnectionConfig, SessionConfig) ->
-spec conn_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}.
conn_open(HomeDir, ConnectionConfig, SessionConfig) ->
?ASYNC_NIF_CALL(fun conn_open_nif/5, [HomeDir,
?ASYNC_NIF_CALL(fun conn_open_nif/4, [HomeDir,
config_to_bin(ConnectionConfig),
config_to_bin(SessionConfig)]).
-spec conn_open_nif(reference(), non_neg_integer(), string(), config(), config()) -> {ok, connection()} | {error, term()}.
conn_open_nif(_AsyncRef, _SchedulerId, _HomeDir, _ConnectionConfig, _SessionConfig) ->
-spec conn_open_nif(reference(), string(), config(), config()) -> {ok, connection()} | {error, term()}.
conn_open_nif(_AsyncRef, _HomeDir, _ConnectionConfig, _SessionConfig) ->
?nif_stub.
-spec connection_close(connection()) -> ok | {error, term()}.
connection_close(ConnRef) ->
?ASYNC_NIF_CALL(fun conn_close_nif/3, [ConnRef]).
?ASYNC_NIF_CALL(fun conn_close_nif/2, [ConnRef]).
-spec conn_close_nif(reference(), non_neg_integer(), connection()) -> ok | {error, term()}.
conn_close_nif(_AsyncRef, _SchedulerId, _ConnRef) ->
-spec conn_close_nif(reference(), connection()) -> ok | {error, term()}.
conn_close_nif(_AsyncRef, _ConnRef) ->
?nif_stub.
-spec create(connection(), string()) -> ok | {error, term()}.
@ -136,10 +136,10 @@ conn_close_nif(_AsyncRef, _SchedulerId, _ConnRef) ->
create(Ref, Name) ->
create(Ref, Name, []).
create(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun create_nif/5, [Ref, Name, config_to_bin(Config)]).
?ASYNC_NIF_CALL(fun create_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec create_nif(reference(), non_neg_integer(), connection(), string(), config()) -> ok | {error, term()}.
create_nif(_AsyncNif, _SchedulerId, _Ref, _Name, _Config) ->
-spec create_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
create_nif(_AsyncNif, _Ref, _Name, _Config) ->
?nif_stub.
-spec drop(connection(), string()) -> ok | {error, term()}.
@ -147,34 +147,34 @@ create_nif(_AsyncNif, _SchedulerId, _Ref, _Name, _Config) ->
drop(Ref, Name) ->
drop(Ref, Name, []).
drop(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun drop_nif/5, [Ref, Name, config_to_bin(Config)]).
?ASYNC_NIF_CALL(fun drop_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec drop_nif(reference(), non_neg_integer(), connection(), string(), config()) -> ok | {error, term()}.
drop_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) ->
-spec drop_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
drop_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub.
-spec delete(connection(), string(), key()) -> ok | {error, term()}.
delete(Ref, Table, Key) ->
?ASYNC_NIF_CALL(fun delete_nif/5, [Ref, Table, Key]).
?ASYNC_NIF_CALL(fun delete_nif/4, [Ref, Table, Key]).
-spec delete_nif(reference(), non_neg_integer(), connection(), string(), key()) -> ok | {error, term()}.
delete_nif(_AsyncRef, _SchedulerId, _Ref, _Table, _Key) ->
-spec delete_nif(reference(), connection(), string(), key()) -> ok | {error, term()}.
delete_nif(_AsyncRef, _Ref, _Table, _Key) ->
?nif_stub.
-spec get(connection(), string(), key()) -> {ok, value()} | not_found | {error, term()}.
get(Ref, Table, Key) ->
?ASYNC_NIF_CALL(fun get_nif/5, [Ref, Table, Key]).
?ASYNC_NIF_CALL(fun get_nif/4, [Ref, Table, Key]).
-spec get_nif(reference(), non_neg_integer(), connection(), string(), key()) -> {ok, value()} | not_found | {error, term()}.
get_nif(_AsyncRef, _SchedulerId, _Ref, _Table, _Key) ->
-spec get_nif(reference(), connection(), string(), key()) -> {ok, value()} | not_found | {error, term()}.
get_nif(_AsyncRef, _Ref, _Table, _Key) ->
?nif_stub.
-spec put(connection(), string(), key(), value()) -> ok | {error, term()}.
put(Ref, Table, Key, Value) ->
?ASYNC_NIF_CALL(fun put_nif/6, [Ref, Table, Key, Value]).
?ASYNC_NIF_CALL(fun put_nif/5, [Ref, Table, Key, Value]).
-spec put_nif(reference(), non_neg_integer(), connection(), string(), key(), value()) -> ok | {error, term()}.
put_nif(_AsyncRef, _SchedulerId, _Ref, _Table, _Key, _Value) ->
-spec put_nif(reference(), connection(), string(), key(), value()) -> ok | {error, term()}.
put_nif(_AsyncRef, _Ref, _Table, _Key, _Value) ->
?nif_stub.
-spec rename(connection(), string(), string()) -> ok | {error, term()}.
@ -182,10 +182,10 @@ put_nif(_AsyncRef, _SchedulerId, _Ref, _Table, _Key, _Value) ->
rename(Ref, OldName, NewName) ->
rename(Ref, OldName, NewName, []).
rename(Ref, OldName, NewName, Config) ->
?ASYNC_NIF_CALL(fun rename_nif/6, [Ref, OldName, NewName, config_to_bin(Config)]).
?ASYNC_NIF_CALL(fun rename_nif/5, [Ref, OldName, NewName, config_to_bin(Config)]).
-spec rename_nif(reference(), non_neg_integer(), connection(), string(), string(), config()) -> ok | {error, term()}.
rename_nif(_AsyncRef, _SchedulerId, _Ref, _OldName, _NewName, _Config) ->
-spec rename_nif(reference(), connection(), string(), string(), config()) -> ok | {error, term()}.
rename_nif(_AsyncRef, _Ref, _OldName, _NewName, _Config) ->
?nif_stub.
-spec salvage(connection(), string()) -> ok | {error, term()}.
@ -193,10 +193,10 @@ rename_nif(_AsyncRef, _SchedulerId, _Ref, _OldName, _NewName, _Config) ->
salvage(Ref, Name) ->
salvage(Ref, Name, []).
salvage(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun salvage_nif/5, [Ref, Name, config_to_bin(Config)]).
?ASYNC_NIF_CALL(fun salvage_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec salvage_nif(reference(), non_neg_integer(), connection(), string(), config()) -> ok | {error, term()}.
salvage_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) ->
-spec salvage_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
salvage_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub.
-spec checkpoint(connection()) -> ok | {error, term()}.
@ -204,10 +204,10 @@ salvage_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) ->
checkpoint(_Ref) ->
checkpoint(_Ref, []).
checkpoint(Ref, Config) ->
?ASYNC_NIF_CALL(fun checkpoint_nif/4, [Ref, config_to_bin(Config)]).
?ASYNC_NIF_CALL(fun checkpoint_nif/3, [Ref, config_to_bin(Config)]).
-spec checkpoint_nif(reference(), non_neg_integer(), connection(), config()) -> ok | {error, term()}.
checkpoint_nif(_AsyncRef, _SchedulerId, _Ref, _Config) ->
-spec checkpoint_nif(reference(), connection(), config()) -> ok | {error, term()}.
checkpoint_nif(_AsyncRef, _Ref, _Config) ->
?nif_stub.
-spec truncate(connection(), string()) -> ok | {error, term()}.
@ -221,10 +221,10 @@ truncate(Ref, Name, Config) ->
truncate(Ref, Name, Start, Stop) ->
truncate(Ref, Name, Start, Stop, []).
truncate(Ref, Name, Start, Stop, Config) ->
?ASYNC_NIF_CALL(fun truncate_nif/7, [Ref, Name, Start, Stop, config_to_bin(Config)]).
?ASYNC_NIF_CALL(fun truncate_nif/6, [Ref, Name, Start, Stop, config_to_bin(Config)]).
-spec truncate_nif(reference(), non_neg_integer(), connection(), string(), cursor() | first, cursor() | last, config()) -> ok | {error, term()}.
truncate_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Start, _Stop, _Config) ->
-spec truncate_nif(reference(), connection(), string(), cursor() | first, cursor() | last, config()) -> ok | {error, term()}.
truncate_nif(_AsyncRef, _Ref, _Name, _Start, _Stop, _Config) ->
?nif_stub.
-spec upgrade(connection(), string()) -> ok | {error, term()}.
@ -232,10 +232,10 @@ truncate_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Start, _Stop, _Config) ->
upgrade(Ref, Name) ->
upgrade(Ref, Name, []).
upgrade(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun upgrade_nif/5, [Ref, Name, config_to_bin(Config)]).
?ASYNC_NIF_CALL(fun upgrade_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec upgrade_nif(reference(), non_neg_integer(), connection(), string(), config()) -> ok | {error, term()}.
upgrade_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) ->
-spec upgrade_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
upgrade_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub.
-spec verify(connection(), string()) -> ok | {error, term()}.
@ -243,10 +243,10 @@ upgrade_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) ->
verify(Ref, Name) ->
verify(Ref, Name, []).
verify(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun verify_nif/5, [Ref, Name, config_to_bin(Config)]).
?ASYNC_NIF_CALL(fun verify_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec verify_nif(reference(), non_neg_integer(), connection(), string(), config()) -> ok | {error, term()}.
verify_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) ->
-spec verify_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
verify_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub.
-spec cursor_open(connection(), string()) -> {ok, cursor()} | {error, term()}.
@ -254,114 +254,114 @@ verify_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) ->
cursor_open(Ref, Table) ->
cursor_open(Ref, Table, []).
cursor_open(Ref, Table, Config) ->
?ASYNC_NIF_CALL(fun cursor_open_nif/5, [Ref, Table, config_to_bin(Config)]).
?ASYNC_NIF_CALL(fun cursor_open_nif/4, [Ref, Table, config_to_bin(Config)]).
-spec cursor_open_nif(reference(), non_neg_integer(), connection(), string(), config()) -> {ok, cursor()} | {error, term()}.
cursor_open_nif(_AsyncRef, _SchedulerId, _Ref, _Table, _Config) ->
-spec cursor_open_nif(reference(), connection(), string(), config()) -> {ok, cursor()} | {error, term()}.
cursor_open_nif(_AsyncRef, _Ref, _Table, _Config) ->
?nif_stub.
-spec cursor_close(cursor()) -> ok | {error, term()}.
cursor_close(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_close_nif/3, [Cursor]).
?ASYNC_NIF_CALL(fun cursor_close_nif/2, [Cursor]).
-spec cursor_close_nif(reference(), non_neg_integer(), cursor()) -> ok | {error, term()}.
cursor_close_nif(_AsyncRef, _SchedulerId, _Cursor) ->
-spec cursor_close_nif(reference(), cursor()) -> ok | {error, term()}.
cursor_close_nif(_AsyncRef, _Cursor) ->
?nif_stub.
-spec cursor_next(cursor()) -> {ok, key(), value()} | not_found | {error, term()}.
cursor_next(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_next_nif/3, [Cursor]).
?ASYNC_NIF_CALL(fun cursor_next_nif/2, [Cursor]).
-spec cursor_next_nif(reference(), non_neg_integer(), cursor()) -> {ok, key(), value()} | not_found | {error, term()}.
cursor_next_nif(_AsyncRef, _SchedulerId, _Cursor) ->
-spec cursor_next_nif(reference(), cursor()) -> {ok, key(), value()} | not_found | {error, term()}.
cursor_next_nif(_AsyncRef, _Cursor) ->
?nif_stub.
-spec cursor_next_key(cursor()) -> {ok, key()} | not_found | {error, term()}.
cursor_next_key(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_next_key_nif/3, [Cursor]).
?ASYNC_NIF_CALL(fun cursor_next_key_nif/2, [Cursor]).
-spec cursor_next_key_nif(reference(), non_neg_integer(), cursor()) -> {ok, key()} | not_found | {error, term()}.
cursor_next_key_nif(_AsyncRef, _SchedulerId, _Cursor) ->
-spec cursor_next_key_nif(reference(), cursor()) -> {ok, key()} | not_found | {error, term()}.
cursor_next_key_nif(_AsyncRef, _Cursor) ->
?nif_stub.
-spec cursor_next_value(cursor()) -> {ok, value()} | not_found | {error, term()}.
cursor_next_value(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_next_value_nif/3, [Cursor]).
?ASYNC_NIF_CALL(fun cursor_next_value_nif/2, [Cursor]).
-spec cursor_next_value_nif(reference(), non_neg_integer(), cursor()) -> {ok, value()} | not_found | {error, term()}.
cursor_next_value_nif(_AsyncRef, _SchedulerId, _Cursor) ->
-spec cursor_next_value_nif(reference(), cursor()) -> {ok, value()} | not_found | {error, term()}.
cursor_next_value_nif(_AsyncRef, _Cursor) ->
?nif_stub.
-spec cursor_prev(cursor()) -> {ok, key(), value()} | not_found | {error, term()}.
cursor_prev(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_prev_nif/3, [Cursor]).
?ASYNC_NIF_CALL(fun cursor_prev_nif/2, [Cursor]).
-spec cursor_prev_nif(reference(), non_neg_integer(), cursor()) -> {ok, key(), value()} | not_found | {error, term()}.
cursor_prev_nif(_AsyncRef, _SchedulerId, _Cursor) ->
-spec cursor_prev_nif(reference(), cursor()) -> {ok, key(), value()} | not_found | {error, term()}.
cursor_prev_nif(_AsyncRef, _Cursor) ->
?nif_stub.
-spec cursor_prev_key(cursor()) -> {ok, key()} | not_found | {error, term()}.
cursor_prev_key(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_prev_key_nif/3, [Cursor]).
?ASYNC_NIF_CALL(fun cursor_prev_key_nif/2, [Cursor]).
-spec cursor_prev_key_nif(reference(), non_neg_integer(), cursor()) -> {ok, key()} | not_found | {error, term()}.
cursor_prev_key_nif(_AsyncRef, _SchedulerId, _Cursor) ->
-spec cursor_prev_key_nif(reference(), cursor()) -> {ok, key()} | not_found | {error, term()}.
cursor_prev_key_nif(_AsyncRef, _Cursor) ->
?nif_stub.
-spec cursor_prev_value(cursor()) -> {ok, value()} | not_found | {error, term()}.
cursor_prev_value(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_prev_value_nif/3, [Cursor]).
?ASYNC_NIF_CALL(fun cursor_prev_value_nif/2, [Cursor]).
-spec cursor_prev_value_nif(reference(), non_neg_integer(), cursor()) -> {ok, value()} | not_found | {error, term()}.
cursor_prev_value_nif(_AsyncRef, _SchedulerId, _Cursor) ->
-spec cursor_prev_value_nif(reference(), cursor()) -> {ok, value()} | not_found | {error, term()}.
cursor_prev_value_nif(_AsyncRef, _Cursor) ->
?nif_stub.
-spec cursor_search(cursor(), key()) -> {ok, value()} | {error, term()}.
cursor_search(Cursor, Key) ->
?ASYNC_NIF_CALL(fun cursor_search_nif/4, [Cursor, Key]).
?ASYNC_NIF_CALL(fun cursor_search_nif/3, [Cursor, Key]).
-spec cursor_search_nif(reference(), non_neg_integer(), cursor(), key()) -> {ok, value()} | {error, term()}.
cursor_search_nif(_AsyncRef, _SchedulerId, _Cursor, _Key) ->
-spec cursor_search_nif(reference(), cursor(), key()) -> {ok, value()} | {error, term()}.
cursor_search_nif(_AsyncRef, _Cursor, _Key) ->
?nif_stub.
-spec cursor_search_near(cursor(), key()) -> {ok, value()} | {error, term()}.
cursor_search_near(Cursor, Key) ->
?ASYNC_NIF_CALL(fun cursor_search_near_nif/4, [Cursor, Key]).
?ASYNC_NIF_CALL(fun cursor_search_near_nif/3, [Cursor, Key]).
-spec cursor_search_near_nif(reference(), non_neg_integer(), cursor(), key()) -> {ok, value()} | {error, term()}.
cursor_search_near_nif(_AsyncRef, _SchedulerId, _Cursor, _Key) ->
-spec cursor_search_near_nif(reference(), cursor(), key()) -> {ok, value()} | {error, term()}.
cursor_search_near_nif(_AsyncRef, _Cursor, _Key) ->
?nif_stub.
-spec cursor_reset(cursor()) -> ok | {error, term()}.
cursor_reset(Cursor) ->
?ASYNC_NIF_CALL(fun cursor_reset_nif/3, [Cursor]).
?ASYNC_NIF_CALL(fun cursor_reset_nif/2, [Cursor]).
-spec cursor_reset_nif(reference(), non_neg_integer(), cursor()) -> ok | {error, term()}.
cursor_reset_nif(_AsyncRef, _SchedulerId, _Cursor) ->
-spec cursor_reset_nif(reference(), cursor()) -> ok | {error, term()}.
cursor_reset_nif(_AsyncRef, _Cursor) ->
?nif_stub.
-spec cursor_insert(cursor(), key(), value()) -> ok | {error, term()}.
cursor_insert(Cursor, Key, Value) ->
?ASYNC_NIF_CALL(fun cursor_insert_nif/5, [Cursor, Key, Value]).
?ASYNC_NIF_CALL(fun cursor_insert_nif/4, [Cursor, Key, Value]).
-spec cursor_insert_nif(reference(), non_neg_integer(), cursor(), key(), value()) -> ok | {error, term()}.
cursor_insert_nif(_AsyncRef, _SchedulerId, _Cursor, _Key, _Value) ->
-spec cursor_insert_nif(reference(), cursor(), key(), value()) -> ok | {error, term()}.
cursor_insert_nif(_AsyncRef, _Cursor, _Key, _Value) ->
?nif_stub.
-spec cursor_update(cursor(), key(), value()) -> ok | {error, term()}.
cursor_update(Cursor, Key, Value) ->
?ASYNC_NIF_CALL(fun cursor_update_nif/5, [Cursor, Key, Value]).
?ASYNC_NIF_CALL(fun cursor_update_nif/4, [Cursor, Key, Value]).
-spec cursor_update_nif(reference(), non_neg_integer(), cursor(), key(), value()) -> ok | {error, term()}.
cursor_update_nif(_AsyncRef, _SchedulerId, _Cursor, _Key, _Value) ->
-spec cursor_update_nif(reference(), cursor(), key(), value()) -> ok | {error, term()}.
cursor_update_nif(_AsyncRef, _Cursor, _Key, _Value) ->
?nif_stub.
-spec cursor_remove(cursor(), key()) -> ok | {error, term()}.
cursor_remove(Cursor, Key) ->
?ASYNC_NIF_CALL(fun cursor_remove_nif/4, [Cursor, Key]).
?ASYNC_NIF_CALL(fun cursor_remove_nif/3, [Cursor, Key]).
-spec cursor_remove_nif(reference(), non_neg_integer(), cursor(), key()) -> ok | {error, term()}.
cursor_remove_nif(_AsyncRef, _SchedulerId, _Cursor, _Key) ->
-spec cursor_remove_nif(reference(), cursor(), key()) -> ok | {error, term()}.
cursor_remove_nif(_AsyncRef, _Cursor, _Key) ->
?nif_stub.
-type fold_keys_fun() :: fun((Key::binary(), any()) -> any()).