diff --git a/c_src/async_nif.h b/c_src/async_nif.h index 75e165c..b81f882 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -33,6 +33,7 @@ extern "C" { #endif #define ASYNC_NIF_MAX_WORKERS 128 +#define ASYNC_NIF_WORKER_QUEUE_SIZE 1024 struct async_nif_req_entry { ERL_NIF_TERM ref, *argv; @@ -48,10 +49,8 @@ struct async_nif_work_queue { ErlNifMutex *reqs_mutex; ErlNifCond *reqs_cnd; unsigned int depth; -#ifdef ASYNC_NIF_STATS - struct async_stats stats; -#endif STAILQ_HEAD(reqs, async_nif_req_entry) reqs; + // TODO: struct async_nif_req_entry items[ASYNC_NIF_WORKER_QUEUE_SIZE]; }; struct async_nif_worker_entry { @@ -81,12 +80,10 @@ struct async_nif_state { struct decl ## _args *args = &on_stack_args; \ struct decl ## _args *copy_of_args; \ struct async_nif_req_entry *req = NULL; \ - int scheduler_id = 0; \ ErlNifEnv *new_env = NULL; \ /* argv[0] is a ref used for selective recv */ \ - /* argv[1] is the current Erlang (scheduler_id - 1) */ \ - const ERL_NIF_TERM *argv = argv_in + 2; \ - argc -= 2; \ + const ERL_NIF_TERM *argv = argv_in + 1; \ + argc -= 1; \ struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env); \ if (async_nif->shutdown) \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ @@ -118,8 +115,7 @@ struct async_nif_state { req->args = (void*)copy_of_args; \ req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \ req->fn_post = (void (*)(void *))fn_post_ ## decl; \ - enif_get_int(env, argv_in[1], &scheduler_id); \ - return async_nif_enqueue_req(async_nif, req, scheduler_id); \ + return async_nif_enqueue_req(async_nif, req); \ } #define ASYNC_NIF_INIT(name) \ @@ -155,7 +151,7 @@ struct async_nif_state { #define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg)) static ERL_NIF_TERM -async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int scheduler_id) +async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req) { /* If we're shutting down return an error term and ignore the request. */ if (async_nif->shutdown) { @@ -163,27 +159,26 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en enif_make_atom(req->env, "shutdown")); } - /* We manage one request queue per-scheduler thread running in the Erlang VM. - Each request is placed onto the queue based on which schdeuler thread - was processing the request. Work queues are balanced only if requests - arrive from a sufficiently random distribution of Erlang scheduler - threads. */ unsigned int qid = async_nif->next_q; // Keep a local to avoid the race. struct async_nif_work_queue *q = &async_nif->queues[qid]; - if (q->depth > 10) - async_nif->next_q = (qid + 1) % async_nif->num_queues; + while (q->depth == ASYNC_NIF_WORKER_QUEUE_SIZE) { + qid = (qid + 1) % async_nif->num_queues; + q = &async_nif->queues[qid]; + } + /* TODO: + if (q->avg_latency > 5) { + async_nif->next_q = (qid + 1) % async_nif->num_queues; + } + */ /* Otherwise, add the request to the work queue. */ enif_mutex_lock(q->reqs_mutex); STAILQ_INSERT_TAIL(&q->reqs, req, entries); q->depth++; - //fprintf(stderr, "enqueued %d (%d)\r\n", qid, async_nif->req_count); fflush(stderr); - /* Build the term before releasing the lock so as not to race on the use of - the req pointer. */ + the req pointer (which will soon become invalid). */ ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"), - enif_make_tuple2(req->env, enif_make_atom(req->env, "enqueued"), - enif_make_int(req->env, q->depth))); + enif_make_atom(req->env, "enqueued")); enif_mutex_unlock(q->reqs_mutex); enif_cond_signal(q->reqs_cnd); return reply; @@ -212,15 +207,18 @@ async_nif_worker_fn(void *arg) enif_cond_wait(q->reqs_cnd, q->reqs_mutex); goto check_again_for_work; } else { - /* At this point, `req` is ours to execute and we hold the reqs_mutex lock. */ + /* At this point the next req is ours to process and we hold the + reqs_mutex lock. */ do { /* Take the request off the queue. */ - //fprintf(stderr, "worker %d queue %d performing req (%d)\r\n", worker_id, (worker_id % async_nif->num_queues), async_nif->req_count); fflush(stderr); STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries); q->depth--; enif_mutex_unlock(q->reqs_mutex); + /* Wake up another thread working on this queue. */ + enif_cond_signal(q->reqs_cnd); + /* Finally, do the work. */ req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); req->fn_post(req->args); @@ -233,7 +231,6 @@ async_nif_worker_fn(void *arg) if (STAILQ_EMPTY(&q->reqs)) { req = NULL; } else { - enif_cond_signal(q->reqs_cnd); enif_mutex_lock(q->reqs_mutex); req = STAILQ_FIRST(&q->reqs); } diff --git a/c_src/wterl.c b/c_src/wterl.c index 5198512..e8d0c55 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -70,8 +70,8 @@ typedef struct { typedef struct { WT_CONNECTION *conn; const char *session_config; - ErlNifMutex *context_mutex; unsigned int num_contexts; + ErlNifMutex *contexts_mutex; WterlCtx contexts[ASYNC_NIF_MAX_WORKERS]; } WterlConnHandle; @@ -102,17 +102,17 @@ __session_for(WterlConnHandle *conn_handle, unsigned int worker_id, WT_SESSION * *session = ctx->session; if (*session == NULL) { /* Create a context for this worker thread to reuse. */ - enif_mutex_lock(conn_handle->context_mutex); + enif_mutex_lock(conn_handle->contexts_mutex); WT_CONNECTION *conn = conn_handle->conn; int rc = conn->open_session(conn, NULL, conn_handle->session_config, session); if (rc != 0) { - enif_mutex_unlock(conn_handle->context_mutex); + enif_mutex_unlock(conn_handle->contexts_mutex); return rc; } ctx->session = *session; ctx->cursors = kh_init(cursors); conn_handle->num_contexts++; - enif_mutex_unlock(conn_handle->context_mutex); + enif_mutex_unlock(conn_handle->contexts_mutex); } return 0; } @@ -120,7 +120,7 @@ __session_for(WterlConnHandle *conn_handle, unsigned int worker_id, WT_SESSION * /** * Close all sessions and all cursors open on any objects. * - * Note: always call within enif_mutex_lock/unlock(conn_handle->context_mutex) + * Note: always call within enif_mutex_lock/unlock(conn_handle->contexts_mutex) */ void __close_all_sessions(WterlConnHandle *conn_handle) @@ -142,7 +142,7 @@ __close_all_sessions(WterlConnHandle *conn_handle) } kh_destroy(cursors, h); session->close(session, NULL); - memset(&conn_handle->contexts[i], 0, sizeof(WterlCtx)); + ctx->session = NULL; } } conn_handle->num_contexts = 0; @@ -151,7 +151,7 @@ __close_all_sessions(WterlConnHandle *conn_handle) /** * Close cursors open on 'uri' object. * - * Note: always call within enif_mutex_lock/unlock(conn_handle->context_mutex) + * Note: always call within enif_mutex_lock/unlock(conn_handle->contexts_mutex) */ void __close_cursors_on(WterlConnHandle *conn_handle, const char *uri) // TODO: race? @@ -189,18 +189,18 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char *cursor = (WT_CURSOR*)kh_value(h, itr); } else { // key does not exist in hash table, create and insert one - enif_mutex_lock(conn_handle->context_mutex); + enif_mutex_lock(conn_handle->contexts_mutex); WT_SESSION *session = conn_handle->contexts[worker_id].session; int rc = session->open_cursor(session, uri, NULL, "overwrite,raw", cursor); if (rc != 0) { - enif_mutex_unlock(conn_handle->context_mutex); + enif_mutex_unlock(conn_handle->contexts_mutex); return rc; } char *key = enif_alloc(sizeof(Uri)); if (!key) { session->close(session, NULL); - enif_mutex_unlock(conn_handle->context_mutex); + enif_mutex_unlock(conn_handle->contexts_mutex); return ENOMEM; } memcpy(key, uri, 128); @@ -208,7 +208,7 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char int itr_status; itr = kh_put(cursors, h, key, &itr_status); kh_value(h, itr) = *cursor; - enif_mutex_unlock(conn_handle->context_mutex); + enif_mutex_unlock(conn_handle->contexts_mutex); } return 0; } @@ -302,7 +302,7 @@ ASYNC_NIF_DECL( conn_handle->conn = conn; conn_handle->num_contexts = 0; memset(conn_handle->contexts, 0, sizeof(WterlCtx) * ASYNC_NIF_MAX_WORKERS); - conn_handle->context_mutex = enif_mutex_create(NULL); + conn_handle->contexts_mutex = enif_mutex_create(NULL); ERL_NIF_TERM result = enif_make_resource(env, conn_handle); enif_release_resource(conn_handle); ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result)); @@ -338,7 +338,7 @@ ASYNC_NIF_DECL( { // work /* Free up the shared sessions and cursors. */ - enif_mutex_lock(args->conn_handle->context_mutex); + enif_mutex_lock(args->conn_handle->contexts_mutex); __close_all_sessions(args->conn_handle); if (args->conn_handle->session_config) { enif_free((char *)args->conn_handle->session_config); @@ -346,8 +346,8 @@ ASYNC_NIF_DECL( } WT_CONNECTION* conn = args->conn_handle->conn; int rc = conn->close(conn, NULL); - enif_mutex_unlock(args->conn_handle->context_mutex); - enif_mutex_destroy(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_destroy(args->conn_handle->contexts_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -441,13 +441,13 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->context_mutex); + enif_mutex_lock(args->conn_handle->contexts_mutex); __close_cursors_on(args->conn_handle, args->uri); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); return; } @@ -459,7 +459,7 @@ ASYNC_NIF_DECL( int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); return; } /* Note: we must first close all cursors referencing this object or this @@ -472,7 +472,7 @@ ASYNC_NIF_DECL( rc = session->drop(session, args->uri, (const char*)config.data); (void)session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -512,7 +512,7 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->context_mutex); + enif_mutex_lock(args->conn_handle->contexts_mutex); __close_cursors_on(args->conn_handle, args->oldname); ErlNifBinary config; @@ -537,7 +537,7 @@ ASYNC_NIF_DECL( // TODO: see drop's note, same goes here. rc = session->rename(session, args->oldname, args->newname, (const char*)config.data); (void)session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -577,7 +577,7 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->context_mutex); + enif_mutex_lock(args->conn_handle->contexts_mutex); __close_cursors_on(args->conn_handle, args->uri); ErlNifBinary config; @@ -599,7 +599,7 @@ ASYNC_NIF_DECL( rc = session->salvage(session, args->uri, (const char*)config.data); (void)session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -706,13 +706,13 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->context_mutex); + enif_mutex_lock(args->conn_handle->contexts_mutex); __close_cursors_on(args->conn_handle, args->uri); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); return; } @@ -725,7 +725,7 @@ ASYNC_NIF_DECL( int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); return; } @@ -740,7 +740,7 @@ ASYNC_NIF_DECL( if (!args->from_first) { if (!enif_inspect_binary(env, args->start, &start_key)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); return; } } @@ -748,7 +748,7 @@ ASYNC_NIF_DECL( if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); return; } /* Position the start cursor at the first record or the specified record. */ @@ -758,7 +758,7 @@ ASYNC_NIF_DECL( ASYNC_NIF_REPLY(__strerror_term(env, rc)); start->close(start); session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); return; } } else { @@ -771,7 +771,7 @@ ASYNC_NIF_DECL( ASYNC_NIF_REPLY(__strerror_term(env, rc)); start->close(start); session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); return; } } @@ -779,7 +779,7 @@ ASYNC_NIF_DECL( if (!args->to_last) { if (!enif_inspect_binary(env, args->stop, &stop_key)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); return; } } @@ -788,7 +788,7 @@ ASYNC_NIF_DECL( ASYNC_NIF_REPLY(__strerror_term(env, rc)); start->close(start); session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); return; } /* Position the stop cursor at the last record or the specified record. */ @@ -799,7 +799,7 @@ ASYNC_NIF_DECL( start->close(start); stop->close(stop); session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); return; } } else { @@ -813,7 +813,7 @@ ASYNC_NIF_DECL( start->close(start); stop->close(stop); session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); return; } } @@ -825,7 +825,7 @@ ASYNC_NIF_DECL( if (start) start->close(start); if (stop) stop->close(stop); if (session) session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -862,13 +862,13 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->context_mutex); + enif_mutex_lock(args->conn_handle->contexts_mutex); __close_cursors_on(args->conn_handle, args->uri); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); return; } @@ -880,13 +880,13 @@ ASYNC_NIF_DECL( int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); return; } rc = session->upgrade(session, args->uri, (const char*)config.data); (void)session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -924,13 +924,13 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->context_mutex); + enif_mutex_lock(args->conn_handle->contexts_mutex); __close_all_sessions(args->conn_handle); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); return; } @@ -942,13 +942,13 @@ ASYNC_NIF_DECL( int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); return; } rc = session->verify(session, args->uri, (const char*)config.data); (void)session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->context_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -1823,36 +1823,36 @@ on_upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data, ERL_NIF_TERM static ErlNifFunc nif_funcs[] = { - {"checkpoint_nif", 4, wterl_checkpoint}, - {"conn_close_nif", 3, wterl_conn_close}, - {"conn_open_nif", 5, wterl_conn_open}, - {"create_nif", 5, wterl_create}, - {"delete_nif", 5, wterl_delete}, - {"drop_nif", 5, wterl_drop}, - {"get_nif", 5, wterl_get}, - {"put_nif", 6, wterl_put}, - {"rename_nif", 6, wterl_rename}, - {"salvage_nif", 5, wterl_salvage}, - // TODO: {"txn_begin", 4, wterl_txn_begin}, - // TODO: {"txn_commit", 4, wterl_txn_commit}, - // TODO: {"txn_abort", 4, wterl_txn_abort}, - {"truncate_nif", 7, wterl_truncate}, - {"upgrade_nif", 5, wterl_upgrade}, - {"verify_nif", 5, wterl_verify}, - {"cursor_close_nif", 3, wterl_cursor_close}, - {"cursor_insert_nif", 5, wterl_cursor_insert}, - {"cursor_next_key_nif", 3, wterl_cursor_next_key}, - {"cursor_next_nif", 3, wterl_cursor_next}, - {"cursor_next_value_nif", 3, wterl_cursor_next_value}, - {"cursor_open_nif", 5, wterl_cursor_open}, - {"cursor_prev_key_nif", 3, wterl_cursor_prev_key}, - {"cursor_prev_nif", 3, wterl_cursor_prev}, - {"cursor_prev_value_nif", 3, wterl_cursor_prev_value}, - {"cursor_remove_nif", 4, wterl_cursor_remove}, - {"cursor_reset_nif", 3, wterl_cursor_reset}, - {"cursor_search_near_nif", 4, wterl_cursor_search_near}, - {"cursor_search_nif", 4, wterl_cursor_search}, - {"cursor_update_nif", 5, wterl_cursor_update}, + {"checkpoint_nif", 3, wterl_checkpoint}, + {"conn_close_nif", 2, wterl_conn_close}, + {"conn_open_nif", 4, wterl_conn_open}, + {"create_nif", 4, wterl_create}, + {"delete_nif", 4, wterl_delete}, + {"drop_nif", 4, wterl_drop}, + {"get_nif", 4, wterl_get}, + {"put_nif", 5, wterl_put}, + {"rename_nif", 5, wterl_rename}, + {"salvage_nif", 4, wterl_salvage}, + // TODO: {"txn_begin", 3, wterl_txn_begin}, + // TODO: {"txn_commit", 3, wterl_txn_commit}, + // TODO: {"txn_abort", 3, wterl_txn_abort}, + {"truncate_nif", 6, wterl_truncate}, + {"upgrade_nif", 4, wterl_upgrade}, + {"verify_nif", 4, wterl_verify}, + {"cursor_close_nif", 2, wterl_cursor_close}, + {"cursor_insert_nif", 4, wterl_cursor_insert}, + {"cursor_next_key_nif", 2, wterl_cursor_next_key}, + {"cursor_next_nif", 2, wterl_cursor_next}, + {"cursor_next_value_nif", 2, wterl_cursor_next_value}, + {"cursor_open_nif", 4, wterl_cursor_open}, + {"cursor_prev_key_nif", 2, wterl_cursor_prev_key}, + {"cursor_prev_nif", 2, wterl_cursor_prev}, + {"cursor_prev_value_nif", 2, wterl_cursor_prev_value}, + {"cursor_remove_nif", 3, wterl_cursor_remove}, + {"cursor_reset_nif", 2, wterl_cursor_reset}, + {"cursor_search_near_nif", 3, wterl_cursor_search_near}, + {"cursor_search_nif", 3, wterl_cursor_search}, + {"cursor_update_nif", 4, wterl_cursor_update}, }; ERL_NIF_INIT(wterl, nif_funcs, &on_load, &on_reload, &on_upgrade, &on_unload); diff --git a/src/async_nif.hrl b/src/async_nif.hrl index d0ffd0f..5110fa2 100644 --- a/src/async_nif.hrl +++ b/src/async_nif.hrl @@ -24,9 +24,8 @@ -define(ASYNC_NIF_CALL(Fun, Args), begin NIFRef = erlang:make_ref(), - Id = erlang:system_info(scheduler_id) - 1, - case erlang:apply(Fun, [NIFRef|[Id|Args]]) of - {ok, {enqueued, _QDepth}} -> + case erlang:apply(Fun, [NIFRef|Args]) of + {ok, enqueued} -> receive {NIFRef, {error, shutdown}=Error} -> %% Work unit was queued, but not executed. diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index 039417e..ec1cd15 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -51,6 +51,7 @@ -define(CAPABILITIES, [async_fold]). -record(state, {table :: string(), + type :: string(), connection :: wterl:connection(), is_empty_cursor :: wterl:cursor(), status_cursor :: wterl:cursor()}). @@ -94,23 +95,31 @@ start(Partition, Config) -> case AppStart of ok -> {ok, Connection} = establish_connection(Config), - Table = "lsm:wt" ++ integer_to_list(Partition), + Type = wterl:config_value(type, Config, "lsm"), + Table = Type ++ ":wt" ++ integer_to_list(Partition), TableOpts = - [{block_compressor, "snappy"}, - {internal_page_max, "128K"}, - {leaf_page_max, "128K"}, - {lsm_chunk_size, "25MB"}, - {lsm_bloom_newest, true}, - {lsm_bloom_oldest, true} , - {lsm_bloom_bit_count, 128}, - {lsm_bloom_hash_count, 64}, - {lsm_bloom_config, [{leaf_page_max, "8MB"}]} - ], + case Type of + "lsm" -> + [{block_compressor, "snappy"}, + {internal_page_max, "128K"}, + {leaf_page_max, "128K"}, + {lsm_chunk_size, "25MB"}, + {lsm_bloom_newest, true}, + {lsm_bloom_oldest, true} , + {lsm_bloom_bit_count, 128}, + {lsm_bloom_hash_count, 64}, + {lsm_bloom_config, [{leaf_page_max, "8MB"}]} ]; + "table" -> + [{block_compressor, "snappy"}]; + _ -> + [] + end, case wterl:create(Connection, Table, TableOpts) of ok -> case establish_utility_cursors(Connection, Table) of {ok, IsEmptyCursor, StatusCursor} -> - {ok, #state{table=Table, connection=Connection, + {ok, #state{table=Table, type=Type, + connection=Connection, is_empty_cursor=IsEmptyCursor, status_cursor=StatusCursor}}; {error, Reason2} -> @@ -345,7 +354,7 @@ establish_utility_cursors(Connection, Table) -> end. %% @private -establish_connection(Config) -> +establish_connection(Config, Type) -> %% Get the data root directory case app_helper:get_prop_or_env(data_root, Config, wterl) of undefined -> @@ -366,7 +375,7 @@ establish_connection(Config) -> wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)), wterl:config_value(statistics_log, Config, [{wait, 30}]), % sec %% NOTE: LSM auto-checkpoints, so we don't have too. - %% wterl:config_value(checkpoint, Config, [{wait, 10}]), % sec + [wterl:config_value(checkpoint, Config, [{wait, 10}]) || Type =:= "table"], wterl:config_value(verbose, Config, [ %"ckpt" "block", "shared_cache", "evictserver", "fileops", %"hazard", "mutex", "read", "readserver", "reconcile", diff --git a/src/wterl.erl b/src/wterl.erl index 67aaa28..aae5f7e 100644 --- a/src/wterl.erl +++ b/src/wterl.erl @@ -115,20 +115,20 @@ connection_open(HomeDir, ConnectionConfig, SessionConfig) -> -spec conn_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}. conn_open(HomeDir, ConnectionConfig, SessionConfig) -> - ?ASYNC_NIF_CALL(fun conn_open_nif/5, [HomeDir, + ?ASYNC_NIF_CALL(fun conn_open_nif/4, [HomeDir, config_to_bin(ConnectionConfig), config_to_bin(SessionConfig)]). --spec conn_open_nif(reference(), non_neg_integer(), string(), config(), config()) -> {ok, connection()} | {error, term()}. -conn_open_nif(_AsyncRef, _SchedulerId, _HomeDir, _ConnectionConfig, _SessionConfig) -> +-spec conn_open_nif(reference(), string(), config(), config()) -> {ok, connection()} | {error, term()}. +conn_open_nif(_AsyncRef, _HomeDir, _ConnectionConfig, _SessionConfig) -> ?nif_stub. -spec connection_close(connection()) -> ok | {error, term()}. connection_close(ConnRef) -> - ?ASYNC_NIF_CALL(fun conn_close_nif/3, [ConnRef]). + ?ASYNC_NIF_CALL(fun conn_close_nif/2, [ConnRef]). --spec conn_close_nif(reference(), non_neg_integer(), connection()) -> ok | {error, term()}. -conn_close_nif(_AsyncRef, _SchedulerId, _ConnRef) -> +-spec conn_close_nif(reference(), connection()) -> ok | {error, term()}. +conn_close_nif(_AsyncRef, _ConnRef) -> ?nif_stub. -spec create(connection(), string()) -> ok | {error, term()}. @@ -136,10 +136,10 @@ conn_close_nif(_AsyncRef, _SchedulerId, _ConnRef) -> create(Ref, Name) -> create(Ref, Name, []). create(Ref, Name, Config) -> - ?ASYNC_NIF_CALL(fun create_nif/5, [Ref, Name, config_to_bin(Config)]). + ?ASYNC_NIF_CALL(fun create_nif/4, [Ref, Name, config_to_bin(Config)]). --spec create_nif(reference(), non_neg_integer(), connection(), string(), config()) -> ok | {error, term()}. -create_nif(_AsyncNif, _SchedulerId, _Ref, _Name, _Config) -> +-spec create_nif(reference(), connection(), string(), config()) -> ok | {error, term()}. +create_nif(_AsyncNif, _Ref, _Name, _Config) -> ?nif_stub. -spec drop(connection(), string()) -> ok | {error, term()}. @@ -147,34 +147,34 @@ create_nif(_AsyncNif, _SchedulerId, _Ref, _Name, _Config) -> drop(Ref, Name) -> drop(Ref, Name, []). drop(Ref, Name, Config) -> - ?ASYNC_NIF_CALL(fun drop_nif/5, [Ref, Name, config_to_bin(Config)]). + ?ASYNC_NIF_CALL(fun drop_nif/4, [Ref, Name, config_to_bin(Config)]). --spec drop_nif(reference(), non_neg_integer(), connection(), string(), config()) -> ok | {error, term()}. -drop_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) -> +-spec drop_nif(reference(), connection(), string(), config()) -> ok | {error, term()}. +drop_nif(_AsyncRef, _Ref, _Name, _Config) -> ?nif_stub. -spec delete(connection(), string(), key()) -> ok | {error, term()}. delete(Ref, Table, Key) -> - ?ASYNC_NIF_CALL(fun delete_nif/5, [Ref, Table, Key]). + ?ASYNC_NIF_CALL(fun delete_nif/4, [Ref, Table, Key]). --spec delete_nif(reference(), non_neg_integer(), connection(), string(), key()) -> ok | {error, term()}. -delete_nif(_AsyncRef, _SchedulerId, _Ref, _Table, _Key) -> +-spec delete_nif(reference(), connection(), string(), key()) -> ok | {error, term()}. +delete_nif(_AsyncRef, _Ref, _Table, _Key) -> ?nif_stub. -spec get(connection(), string(), key()) -> {ok, value()} | not_found | {error, term()}. get(Ref, Table, Key) -> - ?ASYNC_NIF_CALL(fun get_nif/5, [Ref, Table, Key]). + ?ASYNC_NIF_CALL(fun get_nif/4, [Ref, Table, Key]). --spec get_nif(reference(), non_neg_integer(), connection(), string(), key()) -> {ok, value()} | not_found | {error, term()}. -get_nif(_AsyncRef, _SchedulerId, _Ref, _Table, _Key) -> +-spec get_nif(reference(), connection(), string(), key()) -> {ok, value()} | not_found | {error, term()}. +get_nif(_AsyncRef, _Ref, _Table, _Key) -> ?nif_stub. -spec put(connection(), string(), key(), value()) -> ok | {error, term()}. put(Ref, Table, Key, Value) -> - ?ASYNC_NIF_CALL(fun put_nif/6, [Ref, Table, Key, Value]). + ?ASYNC_NIF_CALL(fun put_nif/5, [Ref, Table, Key, Value]). --spec put_nif(reference(), non_neg_integer(), connection(), string(), key(), value()) -> ok | {error, term()}. -put_nif(_AsyncRef, _SchedulerId, _Ref, _Table, _Key, _Value) -> +-spec put_nif(reference(), connection(), string(), key(), value()) -> ok | {error, term()}. +put_nif(_AsyncRef, _Ref, _Table, _Key, _Value) -> ?nif_stub. -spec rename(connection(), string(), string()) -> ok | {error, term()}. @@ -182,10 +182,10 @@ put_nif(_AsyncRef, _SchedulerId, _Ref, _Table, _Key, _Value) -> rename(Ref, OldName, NewName) -> rename(Ref, OldName, NewName, []). rename(Ref, OldName, NewName, Config) -> - ?ASYNC_NIF_CALL(fun rename_nif/6, [Ref, OldName, NewName, config_to_bin(Config)]). + ?ASYNC_NIF_CALL(fun rename_nif/5, [Ref, OldName, NewName, config_to_bin(Config)]). --spec rename_nif(reference(), non_neg_integer(), connection(), string(), string(), config()) -> ok | {error, term()}. -rename_nif(_AsyncRef, _SchedulerId, _Ref, _OldName, _NewName, _Config) -> +-spec rename_nif(reference(), connection(), string(), string(), config()) -> ok | {error, term()}. +rename_nif(_AsyncRef, _Ref, _OldName, _NewName, _Config) -> ?nif_stub. -spec salvage(connection(), string()) -> ok | {error, term()}. @@ -193,10 +193,10 @@ rename_nif(_AsyncRef, _SchedulerId, _Ref, _OldName, _NewName, _Config) -> salvage(Ref, Name) -> salvage(Ref, Name, []). salvage(Ref, Name, Config) -> - ?ASYNC_NIF_CALL(fun salvage_nif/5, [Ref, Name, config_to_bin(Config)]). + ?ASYNC_NIF_CALL(fun salvage_nif/4, [Ref, Name, config_to_bin(Config)]). --spec salvage_nif(reference(), non_neg_integer(), connection(), string(), config()) -> ok | {error, term()}. -salvage_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) -> +-spec salvage_nif(reference(), connection(), string(), config()) -> ok | {error, term()}. +salvage_nif(_AsyncRef, _Ref, _Name, _Config) -> ?nif_stub. -spec checkpoint(connection()) -> ok | {error, term()}. @@ -204,10 +204,10 @@ salvage_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) -> checkpoint(_Ref) -> checkpoint(_Ref, []). checkpoint(Ref, Config) -> - ?ASYNC_NIF_CALL(fun checkpoint_nif/4, [Ref, config_to_bin(Config)]). + ?ASYNC_NIF_CALL(fun checkpoint_nif/3, [Ref, config_to_bin(Config)]). --spec checkpoint_nif(reference(), non_neg_integer(), connection(), config()) -> ok | {error, term()}. -checkpoint_nif(_AsyncRef, _SchedulerId, _Ref, _Config) -> +-spec checkpoint_nif(reference(), connection(), config()) -> ok | {error, term()}. +checkpoint_nif(_AsyncRef, _Ref, _Config) -> ?nif_stub. -spec truncate(connection(), string()) -> ok | {error, term()}. @@ -221,10 +221,10 @@ truncate(Ref, Name, Config) -> truncate(Ref, Name, Start, Stop) -> truncate(Ref, Name, Start, Stop, []). truncate(Ref, Name, Start, Stop, Config) -> - ?ASYNC_NIF_CALL(fun truncate_nif/7, [Ref, Name, Start, Stop, config_to_bin(Config)]). + ?ASYNC_NIF_CALL(fun truncate_nif/6, [Ref, Name, Start, Stop, config_to_bin(Config)]). --spec truncate_nif(reference(), non_neg_integer(), connection(), string(), cursor() | first, cursor() | last, config()) -> ok | {error, term()}. -truncate_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Start, _Stop, _Config) -> +-spec truncate_nif(reference(), connection(), string(), cursor() | first, cursor() | last, config()) -> ok | {error, term()}. +truncate_nif(_AsyncRef, _Ref, _Name, _Start, _Stop, _Config) -> ?nif_stub. -spec upgrade(connection(), string()) -> ok | {error, term()}. @@ -232,10 +232,10 @@ truncate_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Start, _Stop, _Config) -> upgrade(Ref, Name) -> upgrade(Ref, Name, []). upgrade(Ref, Name, Config) -> - ?ASYNC_NIF_CALL(fun upgrade_nif/5, [Ref, Name, config_to_bin(Config)]). + ?ASYNC_NIF_CALL(fun upgrade_nif/4, [Ref, Name, config_to_bin(Config)]). --spec upgrade_nif(reference(), non_neg_integer(), connection(), string(), config()) -> ok | {error, term()}. -upgrade_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) -> +-spec upgrade_nif(reference(), connection(), string(), config()) -> ok | {error, term()}. +upgrade_nif(_AsyncRef, _Ref, _Name, _Config) -> ?nif_stub. -spec verify(connection(), string()) -> ok | {error, term()}. @@ -243,10 +243,10 @@ upgrade_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) -> verify(Ref, Name) -> verify(Ref, Name, []). verify(Ref, Name, Config) -> - ?ASYNC_NIF_CALL(fun verify_nif/5, [Ref, Name, config_to_bin(Config)]). + ?ASYNC_NIF_CALL(fun verify_nif/4, [Ref, Name, config_to_bin(Config)]). --spec verify_nif(reference(), non_neg_integer(), connection(), string(), config()) -> ok | {error, term()}. -verify_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) -> +-spec verify_nif(reference(), connection(), string(), config()) -> ok | {error, term()}. +verify_nif(_AsyncRef, _Ref, _Name, _Config) -> ?nif_stub. -spec cursor_open(connection(), string()) -> {ok, cursor()} | {error, term()}. @@ -254,114 +254,114 @@ verify_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) -> cursor_open(Ref, Table) -> cursor_open(Ref, Table, []). cursor_open(Ref, Table, Config) -> - ?ASYNC_NIF_CALL(fun cursor_open_nif/5, [Ref, Table, config_to_bin(Config)]). + ?ASYNC_NIF_CALL(fun cursor_open_nif/4, [Ref, Table, config_to_bin(Config)]). --spec cursor_open_nif(reference(), non_neg_integer(), connection(), string(), config()) -> {ok, cursor()} | {error, term()}. -cursor_open_nif(_AsyncRef, _SchedulerId, _Ref, _Table, _Config) -> +-spec cursor_open_nif(reference(), connection(), string(), config()) -> {ok, cursor()} | {error, term()}. +cursor_open_nif(_AsyncRef, _Ref, _Table, _Config) -> ?nif_stub. -spec cursor_close(cursor()) -> ok | {error, term()}. cursor_close(Cursor) -> - ?ASYNC_NIF_CALL(fun cursor_close_nif/3, [Cursor]). + ?ASYNC_NIF_CALL(fun cursor_close_nif/2, [Cursor]). --spec cursor_close_nif(reference(), non_neg_integer(), cursor()) -> ok | {error, term()}. -cursor_close_nif(_AsyncRef, _SchedulerId, _Cursor) -> +-spec cursor_close_nif(reference(), cursor()) -> ok | {error, term()}. +cursor_close_nif(_AsyncRef, _Cursor) -> ?nif_stub. -spec cursor_next(cursor()) -> {ok, key(), value()} | not_found | {error, term()}. cursor_next(Cursor) -> - ?ASYNC_NIF_CALL(fun cursor_next_nif/3, [Cursor]). + ?ASYNC_NIF_CALL(fun cursor_next_nif/2, [Cursor]). --spec cursor_next_nif(reference(), non_neg_integer(), cursor()) -> {ok, key(), value()} | not_found | {error, term()}. -cursor_next_nif(_AsyncRef, _SchedulerId, _Cursor) -> +-spec cursor_next_nif(reference(), cursor()) -> {ok, key(), value()} | not_found | {error, term()}. +cursor_next_nif(_AsyncRef, _Cursor) -> ?nif_stub. -spec cursor_next_key(cursor()) -> {ok, key()} | not_found | {error, term()}. cursor_next_key(Cursor) -> - ?ASYNC_NIF_CALL(fun cursor_next_key_nif/3, [Cursor]). + ?ASYNC_NIF_CALL(fun cursor_next_key_nif/2, [Cursor]). --spec cursor_next_key_nif(reference(), non_neg_integer(), cursor()) -> {ok, key()} | not_found | {error, term()}. -cursor_next_key_nif(_AsyncRef, _SchedulerId, _Cursor) -> +-spec cursor_next_key_nif(reference(), cursor()) -> {ok, key()} | not_found | {error, term()}. +cursor_next_key_nif(_AsyncRef, _Cursor) -> ?nif_stub. -spec cursor_next_value(cursor()) -> {ok, value()} | not_found | {error, term()}. cursor_next_value(Cursor) -> - ?ASYNC_NIF_CALL(fun cursor_next_value_nif/3, [Cursor]). + ?ASYNC_NIF_CALL(fun cursor_next_value_nif/2, [Cursor]). --spec cursor_next_value_nif(reference(), non_neg_integer(), cursor()) -> {ok, value()} | not_found | {error, term()}. -cursor_next_value_nif(_AsyncRef, _SchedulerId, _Cursor) -> +-spec cursor_next_value_nif(reference(), cursor()) -> {ok, value()} | not_found | {error, term()}. +cursor_next_value_nif(_AsyncRef, _Cursor) -> ?nif_stub. -spec cursor_prev(cursor()) -> {ok, key(), value()} | not_found | {error, term()}. cursor_prev(Cursor) -> - ?ASYNC_NIF_CALL(fun cursor_prev_nif/3, [Cursor]). + ?ASYNC_NIF_CALL(fun cursor_prev_nif/2, [Cursor]). --spec cursor_prev_nif(reference(), non_neg_integer(), cursor()) -> {ok, key(), value()} | not_found | {error, term()}. -cursor_prev_nif(_AsyncRef, _SchedulerId, _Cursor) -> +-spec cursor_prev_nif(reference(), cursor()) -> {ok, key(), value()} | not_found | {error, term()}. +cursor_prev_nif(_AsyncRef, _Cursor) -> ?nif_stub. -spec cursor_prev_key(cursor()) -> {ok, key()} | not_found | {error, term()}. cursor_prev_key(Cursor) -> - ?ASYNC_NIF_CALL(fun cursor_prev_key_nif/3, [Cursor]). + ?ASYNC_NIF_CALL(fun cursor_prev_key_nif/2, [Cursor]). --spec cursor_prev_key_nif(reference(), non_neg_integer(), cursor()) -> {ok, key()} | not_found | {error, term()}. -cursor_prev_key_nif(_AsyncRef, _SchedulerId, _Cursor) -> +-spec cursor_prev_key_nif(reference(), cursor()) -> {ok, key()} | not_found | {error, term()}. +cursor_prev_key_nif(_AsyncRef, _Cursor) -> ?nif_stub. -spec cursor_prev_value(cursor()) -> {ok, value()} | not_found | {error, term()}. cursor_prev_value(Cursor) -> - ?ASYNC_NIF_CALL(fun cursor_prev_value_nif/3, [Cursor]). + ?ASYNC_NIF_CALL(fun cursor_prev_value_nif/2, [Cursor]). --spec cursor_prev_value_nif(reference(), non_neg_integer(), cursor()) -> {ok, value()} | not_found | {error, term()}. -cursor_prev_value_nif(_AsyncRef, _SchedulerId, _Cursor) -> +-spec cursor_prev_value_nif(reference(), cursor()) -> {ok, value()} | not_found | {error, term()}. +cursor_prev_value_nif(_AsyncRef, _Cursor) -> ?nif_stub. -spec cursor_search(cursor(), key()) -> {ok, value()} | {error, term()}. cursor_search(Cursor, Key) -> - ?ASYNC_NIF_CALL(fun cursor_search_nif/4, [Cursor, Key]). + ?ASYNC_NIF_CALL(fun cursor_search_nif/3, [Cursor, Key]). --spec cursor_search_nif(reference(), non_neg_integer(), cursor(), key()) -> {ok, value()} | {error, term()}. -cursor_search_nif(_AsyncRef, _SchedulerId, _Cursor, _Key) -> +-spec cursor_search_nif(reference(), cursor(), key()) -> {ok, value()} | {error, term()}. +cursor_search_nif(_AsyncRef, _Cursor, _Key) -> ?nif_stub. -spec cursor_search_near(cursor(), key()) -> {ok, value()} | {error, term()}. cursor_search_near(Cursor, Key) -> - ?ASYNC_NIF_CALL(fun cursor_search_near_nif/4, [Cursor, Key]). + ?ASYNC_NIF_CALL(fun cursor_search_near_nif/3, [Cursor, Key]). --spec cursor_search_near_nif(reference(), non_neg_integer(), cursor(), key()) -> {ok, value()} | {error, term()}. -cursor_search_near_nif(_AsyncRef, _SchedulerId, _Cursor, _Key) -> +-spec cursor_search_near_nif(reference(), cursor(), key()) -> {ok, value()} | {error, term()}. +cursor_search_near_nif(_AsyncRef, _Cursor, _Key) -> ?nif_stub. -spec cursor_reset(cursor()) -> ok | {error, term()}. cursor_reset(Cursor) -> - ?ASYNC_NIF_CALL(fun cursor_reset_nif/3, [Cursor]). + ?ASYNC_NIF_CALL(fun cursor_reset_nif/2, [Cursor]). --spec cursor_reset_nif(reference(), non_neg_integer(), cursor()) -> ok | {error, term()}. -cursor_reset_nif(_AsyncRef, _SchedulerId, _Cursor) -> +-spec cursor_reset_nif(reference(), cursor()) -> ok | {error, term()}. +cursor_reset_nif(_AsyncRef, _Cursor) -> ?nif_stub. -spec cursor_insert(cursor(), key(), value()) -> ok | {error, term()}. cursor_insert(Cursor, Key, Value) -> - ?ASYNC_NIF_CALL(fun cursor_insert_nif/5, [Cursor, Key, Value]). + ?ASYNC_NIF_CALL(fun cursor_insert_nif/4, [Cursor, Key, Value]). --spec cursor_insert_nif(reference(), non_neg_integer(), cursor(), key(), value()) -> ok | {error, term()}. -cursor_insert_nif(_AsyncRef, _SchedulerId, _Cursor, _Key, _Value) -> +-spec cursor_insert_nif(reference(), cursor(), key(), value()) -> ok | {error, term()}. +cursor_insert_nif(_AsyncRef, _Cursor, _Key, _Value) -> ?nif_stub. -spec cursor_update(cursor(), key(), value()) -> ok | {error, term()}. cursor_update(Cursor, Key, Value) -> - ?ASYNC_NIF_CALL(fun cursor_update_nif/5, [Cursor, Key, Value]). + ?ASYNC_NIF_CALL(fun cursor_update_nif/4, [Cursor, Key, Value]). --spec cursor_update_nif(reference(), non_neg_integer(), cursor(), key(), value()) -> ok | {error, term()}. -cursor_update_nif(_AsyncRef, _SchedulerId, _Cursor, _Key, _Value) -> +-spec cursor_update_nif(reference(), cursor(), key(), value()) -> ok | {error, term()}. +cursor_update_nif(_AsyncRef, _Cursor, _Key, _Value) -> ?nif_stub. -spec cursor_remove(cursor(), key()) -> ok | {error, term()}. cursor_remove(Cursor, Key) -> - ?ASYNC_NIF_CALL(fun cursor_remove_nif/4, [Cursor, Key]). + ?ASYNC_NIF_CALL(fun cursor_remove_nif/3, [Cursor, Key]). --spec cursor_remove_nif(reference(), non_neg_integer(), cursor(), key()) -> ok | {error, term()}. -cursor_remove_nif(_AsyncRef, _SchedulerId, _Cursor, _Key) -> +-spec cursor_remove_nif(reference(), cursor(), key()) -> ok | {error, term()}. +cursor_remove_nif(_AsyncRef, _Cursor, _Key) -> ?nif_stub. -type fold_keys_fun() :: fun((Key::binary(), any()) -> any()).