diff --git a/c_src/async_nif.h b/c_src/async_nif.h index 979b1b7..5c0a4d9 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -28,7 +28,7 @@ extern "C" { #include "queue.h" -#define ASYNC_NIF_MAX_WORKERS 32 +#define ASYNC_NIF_MAX_WORKERS 128 struct async_nif_req_entry { ERL_NIF_TERM ref, *argv; @@ -40,25 +40,27 @@ struct async_nif_req_entry { STAILQ_ENTRY(async_nif_req_entry) entries; }; +struct async_nif_work_queue { + ErlNifMutex *reqs_mutex; + ErlNifCond *reqs_cnd; + STAILQ_HEAD(reqs, async_nif_req_entry) reqs; +}; + struct async_nif_worker_entry { ErlNifTid tid; - LIST_ENTRY(async_nif_worker_entry) entries; + unsigned int worker_id; + struct async_nif_state *async_nif; + struct async_nif_work_queue *q; }; struct async_nif_state { - volatile unsigned int req_count; - volatile unsigned int shutdown; - ErlNifMutex *req_mutex; - ErlNifCond *cnd; - STAILQ_HEAD(reqs, async_nif_req_entry) reqs; + unsigned int req_count; + unsigned int shutdown; unsigned int num_workers; struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS]; -}; - -struct async_nif_worker_info { - struct async_nif_state *async_nif; - struct async_nif_worker_entry *worker; - unsigned int worker_id; + unsigned int num_queues; + unsigned int next_q; + struct async_nif_work_queue queues[ASYNC_NIF_MAX_WORKERS]; }; #define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \ @@ -72,10 +74,12 @@ struct async_nif_worker_info { struct decl ## _args *args = &on_stack_args; \ struct decl ## _args *copy_of_args; \ struct async_nif_req_entry *req = NULL; \ + int scheduler_id = 0; \ ErlNifEnv *new_env = NULL; \ /* argv[0] is a ref used for selective recv */ \ - const ERL_NIF_TERM *argv = argv_in + 1; \ - argc--; \ + /* argv[1] is the current Erlang (scheduler_id - 1) */ \ + const ERL_NIF_TERM *argv = argv_in + 2; \ + argc -= 2; \ struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env); \ if (async_nif->shutdown) \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ @@ -107,7 +111,8 @@ struct async_nif_worker_info { req->args = (void*)copy_of_args; \ req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \ req->fn_post = (void (*)(void *))fn_post_ ## decl; \ - return async_nif_enqueue_req(async_nif, req); \ + enif_get_int(env, argv_in[1], &scheduler_id); \ + return async_nif_enqueue_req(async_nif, req, scheduler_id); \ } #define ASYNC_NIF_INIT(name) \ @@ -143,7 +148,7 @@ struct async_nif_worker_info { #define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg)) static ERL_NIF_TERM -async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req) +async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int scheduler_id) { /* If we're shutting down return an error term and ignore the request. */ if (async_nif->shutdown) { @@ -151,51 +156,62 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en enif_make_atom(req->env, "shutdown")); } + /* We manage one request queue per-scheduler thread running in the Erlang VM. + Each request is placed onto the queue based on which schdeuler thread + was processing the request. Work queues are balanced only if requests + arrive from a sufficiently random distribution of Erlang scheduler + threads. */ + unsigned int qid = async_nif->next_q; // Keep a local to avoid the race. + struct async_nif_work_queue *q = &async_nif->queues[qid]; + async_nif->next_q = (qid + 1) % async_nif->num_queues; + /* Otherwise, add the request to the work queue. */ - enif_mutex_lock(async_nif->req_mutex); - STAILQ_INSERT_TAIL(&async_nif->reqs, req, entries); + enif_mutex_lock(q->reqs_mutex); + STAILQ_INSERT_TAIL(&q->reqs, req, entries); async_nif->req_count++; + //fprintf(stderr, "enqueued %d (%d)\r\n", qid, async_nif->req_count); fflush(stderr); + /* Build the term before releasing the lock so as not to race on the use of the req pointer. */ ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"), enif_make_tuple2(req->env, enif_make_atom(req->env, "enqueued"), enif_make_int(req->env, async_nif->req_count))); - enif_mutex_unlock(async_nif->req_mutex); - enif_cond_broadcast(async_nif->cnd); + enif_mutex_unlock(q->reqs_mutex); + enif_cond_broadcast(q->reqs_cnd); return reply; } static void * async_nif_worker_fn(void *arg) { - struct async_nif_worker_info *wi = (struct async_nif_worker_info *)arg; - struct async_nif_state *async_nif = wi->async_nif; - unsigned int worker_id = wi->worker_id; - - enif_free(arg); // Allocated when starting the thread, now no longer needed. + struct async_nif_worker_entry *we = (struct async_nif_worker_entry *)arg; + unsigned int worker_id = we->worker_id; + struct async_nif_state *async_nif = we->async_nif; + struct async_nif_work_queue *q = we->q; for(;;) { struct async_nif_req_entry *req = NULL; /* Examine the request queue, are there things to be done? */ - enif_mutex_lock(async_nif->req_mutex); + enif_mutex_lock(q->reqs_mutex); check_again_for_work: if (async_nif->shutdown) { - enif_mutex_unlock(async_nif->req_mutex); + enif_mutex_unlock(q->reqs_mutex); break; } - if ((req = STAILQ_FIRST(&async_nif->reqs)) == NULL) { + if ((req = STAILQ_FIRST(&q->reqs)) == NULL) { /* Queue is empty, wait for work */ - enif_cond_wait(async_nif->cnd, async_nif->req_mutex); + enif_cond_wait(q->reqs_cnd, q->reqs_mutex); goto check_again_for_work; } else { - /* `req` is our work request and we hold the req_mutex lock. */ + /* At this point, `req` is ours to execute and we hold the reqs_mutex lock. */ do { /* Take the request off the queue. */ - STAILQ_REMOVE(&async_nif->reqs, req, async_nif_req_entry, entries); + //fprintf(stderr, "worker %d queue %d performing req (%d)\r\n", worker_id, (worker_id % async_nif->num_queues), async_nif->req_count); fflush(stderr); + STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries); async_nif->req_count--; - enif_mutex_unlock(async_nif->req_mutex); + enif_mutex_unlock(q->reqs_mutex); /* Finally, do the work. */ req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); @@ -204,14 +220,14 @@ async_nif_worker_fn(void *arg) enif_free_env(req->env); enif_free(req); - /* Review the work queue, start more worker threads if they are needed. */ - // TODO: if queue_depth > last_depth && num_workers < MAX, start one up - /* Continue working if more requests are in the queue, otherwise wait for new work to arrive. */ - enif_mutex_lock(async_nif->req_mutex); - if ((req = STAILQ_FIRST(&async_nif->reqs)) == NULL) { - enif_mutex_unlock(async_nif->req_mutex); + if (STAILQ_EMPTY(&q->reqs)) { + req = NULL; + } else { + enif_cond_broadcast(q->reqs_cnd); + enif_mutex_lock(q->reqs_mutex); + req = STAILQ_FIRST(&q->reqs); } } while(req); @@ -228,10 +244,13 @@ async_nif_unload(ErlNifEnv *env) struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env); /* Signal the worker threads, stop what you're doing and exit. */ - enif_mutex_lock(async_nif->req_mutex); async_nif->shutdown = 1; - enif_cond_broadcast(async_nif->cnd); - enif_mutex_unlock(async_nif->req_mutex); + + /* Wake up any waiting worker threads. */ + for (i = 0; i < async_nif->num_queues; i++) { + struct async_nif_work_queue *q = &async_nif->queues[i]; + enif_cond_broadcast(q->reqs_cnd); + } /* Join for the now exiting worker threads. */ for (i = 0; i < async_nif->num_workers; ++i) { @@ -239,30 +258,26 @@ async_nif_unload(ErlNifEnv *env) enif_thread_join(async_nif->worker_entries[i].tid, &exit_value); } - /* We won't get here until all threads have exited. - Patch things up, and carry on. */ - enif_mutex_lock(async_nif->req_mutex); + /* Cleanup requests, mutexes and conditions in each work queue. */ + for (i = 0; i < async_nif->num_queues; i++) { + struct async_nif_work_queue *q = &async_nif->queues[i]; + enif_mutex_destroy(q->reqs_mutex); + enif_cond_destroy(q->reqs_cnd); - /* Worker threads are stopped, now toss anything left in the queue. */ - struct async_nif_req_entry *req = NULL; - STAILQ_FOREACH(req, &async_nif->reqs, entries) { - STAILQ_REMOVE(&async_nif->reqs, STAILQ_LAST(&async_nif->reqs, async_nif_req_entry, entries), - async_nif_req_entry, entries); - enif_send(NULL, &req->pid, req->env, - enif_make_tuple2(req->env, enif_make_atom(req->env, "error"), - enif_make_atom(req->env, "shutdown"))); - req->fn_post(req->args); - enif_free(req->args); - enif_free(req); - async_nif->req_count--; + /* Worker threads are stopped, now toss anything left in the queue. */ + struct async_nif_req_entry *req = NULL; + STAILQ_FOREACH(req, &q->reqs, entries) { + STAILQ_REMOVE(&q->reqs, STAILQ_LAST(&q->reqs, async_nif_req_entry, entries), + async_nif_req_entry, entries); + enif_send(NULL, &req->pid, req->env, + enif_make_tuple2(req->env, enif_make_atom(req->env, "error"), + enif_make_atom(req->env, "shutdown"))); + req->fn_post(req->args); + enif_free(req->args); + enif_free(req); + async_nif->req_count--; + } } - enif_mutex_unlock(async_nif->req_mutex); - - bzero(async_nif->worker_entries, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS); - enif_cond_destroy(async_nif->cnd); - async_nif->cnd = NULL; - enif_mutex_destroy(async_nif->req_mutex); - async_nif->req_mutex = NULL; bzero(async_nif, sizeof(struct async_nif_state)); enif_free(async_nif); } @@ -271,7 +286,7 @@ static void * async_nif_load(void) { static int has_init = 0; - int i, num_schedulers; + unsigned int i, j; ErlNifSysInfo info; struct async_nif_state *async_nif; @@ -281,53 +296,59 @@ async_nif_load(void) /* Find out how many schedulers there are. */ enif_system_info(&info, sizeof(ErlNifSysInfo)); - num_schedulers = info.scheduler_threads; /* Init our portion of priv_data's module-specific state. */ async_nif = enif_alloc(sizeof(struct async_nif_state)); if (!async_nif) return NULL; bzero(async_nif, sizeof(struct async_nif_state)); - STAILQ_INIT(&(async_nif->reqs)); + + async_nif->num_queues = info.scheduler_threads; + async_nif->next_q = 0; + async_nif->req_count = 0; async_nif->shutdown = 0; - async_nif->req_mutex = enif_mutex_create(NULL); - async_nif->cnd = enif_cond_create(NULL); - - /* Setup the requests management. */ - async_nif->req_count = 0; + for (i = 0; i < async_nif->num_queues; i++) { + struct async_nif_work_queue *q = &async_nif->queues[i]; + STAILQ_INIT(&q->reqs); + q->reqs_mutex = enif_mutex_create(NULL); + q->reqs_cnd = enif_cond_create(NULL); + } /* Setup the thread pool management. */ bzero(async_nif->worker_entries, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS); - /* Start the minimum of max workers allowed or number of scheduler threads running. */ - unsigned int num_worker_threads = ASYNC_NIF_MAX_WORKERS; - if (num_schedulers < ASYNC_NIF_MAX_WORKERS) - num_worker_threads = num_schedulers; - if (num_worker_threads < 1) - num_worker_threads = 1; - num_worker_threads = ASYNC_NIF_MAX_WORKERS; // TODO: make this dynamic at some point + /* Start the worker threads. */ + //unsigned int num_workers = ASYNC_NIF_MAX_WORKERS - (ASYNC_NIF_MAX_WORKERS % async_nif->num_queues); + unsigned int num_workers = async_nif->num_queues; + //unsigned int allocation = 1; + //if (num_workers > async_nif->num_queues) { + // allocation = num_workers / async_nif->num_queues; + //} - for (i = 0; i < num_worker_threads; i++) { - struct async_nif_worker_info *wi; - wi = enif_alloc(sizeof(struct async_nif_worker_info)); // TODO: check - bzero(wi, sizeof(struct async_nif_worker_info)); - wi->async_nif = async_nif; - wi->worker = &async_nif->worker_entries[i]; - wi->worker_id = i; + for (i = 0; i < num_workers; i++) { + struct async_nif_worker_entry *we = &async_nif->worker_entries[i]; + we->async_nif = async_nif; + we->worker_id = i; + we->q = &async_nif->queues[i % async_nif->num_queues]; + //fprintf(stderr, "%d:%d:%d | allocating worker_id %d to queue %d\r\n", num_workers, async_nif->num_queues, allocation, i, i % async_nif->num_queues); fflush(stderr); if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid, - &async_nif_worker_fn, (void*)wi, NULL) != 0) { + &async_nif_worker_fn, (void*)we, NULL) != 0) { async_nif->shutdown = 1; - enif_cond_broadcast(async_nif->cnd); + + for (j = 0; j < async_nif->num_queues; j++) { + struct async_nif_work_queue *q = &async_nif->queues[j]; + enif_cond_broadcast(q->reqs_cnd); + enif_mutex_destroy(q->reqs_mutex); + enif_cond_destroy(q->reqs_cnd); + } + while(i-- > 0) { void *exit_value = 0; /* Ignore this. */ enif_thread_join(async_nif->worker_entries[i].tid, &exit_value); } + bzero(async_nif->worker_entries, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS); - enif_cond_destroy(async_nif->cnd); - async_nif->cnd = NULL; - enif_mutex_destroy(async_nif->req_mutex); - async_nif->req_mutex = NULL; return NULL; } } diff --git a/c_src/wterl.c b/c_src/wterl.c index 0988eb7..69be497 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -30,17 +30,13 @@ #ifdef DEBUG #include #include -void debugf(const char *fmt, ...) -{ - va_list ap; - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - fprintf(stderr, "\r\n"); - fflush(stderr); - va_end(ap); -} +#define dprint(s, ...) do { \ + fprintf(stderr, s, ##__VA_ARGS__); \ + fprintf(stderr, "\r\n"); \ + fflush(stderr); \ + } while(0); #else -# define debugf(X, ...) {} +# define dprint(s, ...) {} #endif static ErlNifResourceType *wterl_conn_RESOURCE; @@ -282,7 +278,7 @@ ASYNC_NIF_DECL( ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } - //debugf("c: %d // %s\ns: %d // %s", config.size, (char *)config.data, (char *)session_config.data, session_config.size); + //dprint("c: %d // %s\ns: %d // %s", config.size, (char *)config.data, (char *)session_config.data, session_config.size); int rc = wiredtiger_open(args->homedir, NULL, config.data[0] != 0 ? (const char*)config.data : NULL, &conn); if (rc == 0) { WterlConnHandle *conn_handle = enif_alloc_resource(wterl_conn_RESOURCE, sizeof(WterlConnHandle)); @@ -1203,7 +1199,7 @@ ASYNC_NIF_DECL( /* We create a separate session here to ensure that operations are thread safe. */ WT_CONNECTION *conn = args->conn_handle->conn; WT_SESSION *session = NULL; - //debugf("cursor open: %s", (char *)args->conn_handle->session_config); + //dprint("cursor open: %s", (char *)args->conn_handle->session_config); int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); @@ -1823,36 +1819,36 @@ on_upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data, ERL_NIF_TERM static ErlNifFunc nif_funcs[] = { - {"checkpoint_nif", 3, wterl_checkpoint}, - {"conn_close_nif", 2, wterl_conn_close}, - {"conn_open_nif", 4, wterl_conn_open}, - {"create_nif", 4, wterl_create}, - {"delete_nif", 4, wterl_delete}, - {"drop_nif", 4, wterl_drop}, - {"get_nif", 4, wterl_get}, - {"put_nif", 5, wterl_put}, - {"rename_nif", 5, wterl_rename}, - {"salvage_nif", 4, wterl_salvage}, - // TODO: {"txn_begin", 3, wterl_txn_begin}, - // TODO: {"txn_commit", 3, wterl_txn_commit}, - // TODO: {"txn_abort", 3, wterl_txn_abort}, - {"truncate_nif", 6, wterl_truncate}, - {"upgrade_nif", 4, wterl_upgrade}, - {"verify_nif", 4, wterl_verify}, - {"cursor_close_nif", 2, wterl_cursor_close}, - {"cursor_insert_nif", 4, wterl_cursor_insert}, - {"cursor_next_key_nif", 2, wterl_cursor_next_key}, - {"cursor_next_nif", 2, wterl_cursor_next}, - {"cursor_next_value_nif", 2, wterl_cursor_next_value}, - {"cursor_open_nif", 4, wterl_cursor_open}, - {"cursor_prev_key_nif", 2, wterl_cursor_prev_key}, - {"cursor_prev_nif", 2, wterl_cursor_prev}, - {"cursor_prev_value_nif", 2, wterl_cursor_prev_value}, - {"cursor_remove_nif", 3, wterl_cursor_remove}, - {"cursor_reset_nif", 2, wterl_cursor_reset}, - {"cursor_search_near_nif", 3, wterl_cursor_search_near}, - {"cursor_search_nif", 3, wterl_cursor_search}, - {"cursor_update_nif", 4, wterl_cursor_update}, + {"checkpoint_nif", 4, wterl_checkpoint}, + {"conn_close_nif", 3, wterl_conn_close}, + {"conn_open_nif", 5, wterl_conn_open}, + {"create_nif", 5, wterl_create}, + {"delete_nif", 5, wterl_delete}, + {"drop_nif", 5, wterl_drop}, + {"get_nif", 5, wterl_get}, + {"put_nif", 6, wterl_put}, + {"rename_nif", 6, wterl_rename}, + {"salvage_nif", 5, wterl_salvage}, + // TODO: {"txn_begin", 4, wterl_txn_begin}, + // TODO: {"txn_commit", 4, wterl_txn_commit}, + // TODO: {"txn_abort", 4, wterl_txn_abort}, + {"truncate_nif", 7, wterl_truncate}, + {"upgrade_nif", 5, wterl_upgrade}, + {"verify_nif", 5, wterl_verify}, + {"cursor_close_nif", 3, wterl_cursor_close}, + {"cursor_insert_nif", 5, wterl_cursor_insert}, + {"cursor_next_key_nif", 3, wterl_cursor_next_key}, + {"cursor_next_nif", 3, wterl_cursor_next}, + {"cursor_next_value_nif", 3, wterl_cursor_next_value}, + {"cursor_open_nif", 5, wterl_cursor_open}, + {"cursor_prev_key_nif", 3, wterl_cursor_prev_key}, + {"cursor_prev_nif", 3, wterl_cursor_prev}, + {"cursor_prev_value_nif", 3, wterl_cursor_prev_value}, + {"cursor_remove_nif", 4, wterl_cursor_remove}, + {"cursor_reset_nif", 3, wterl_cursor_reset}, + {"cursor_search_near_nif", 4, wterl_cursor_search_near}, + {"cursor_search_nif", 4, wterl_cursor_search}, + {"cursor_update_nif", 5, wterl_cursor_update}, }; ERL_NIF_INIT(wterl, nif_funcs, &on_load, &on_reload, &on_upgrade, &on_unload); diff --git a/src/async_nif.hrl b/src/async_nif.hrl index 4d14a3a..d0ffd0f 100644 --- a/src/async_nif.hrl +++ b/src/async_nif.hrl @@ -24,7 +24,8 @@ -define(ASYNC_NIF_CALL(Fun, Args), begin NIFRef = erlang:make_ref(), - case erlang:apply(Fun, [NIFRef|Args]) of + Id = erlang:system_info(scheduler_id) - 1, + case erlang:apply(Fun, [NIFRef|[Id|Args]]) of {ok, {enqueued, _QDepth}} -> receive {NIFRef, {error, shutdown}=Error} -> diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index 8492e7b..039417e 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -321,13 +321,14 @@ callback(_Ref, _Msg, State) -> %% =================================================================== %% @private -max_sessions(Config) -> - RingSize = - case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of - undefined -> 1024; - Size -> Size - end, - 2 * (RingSize * erlang:system_info(schedulers)). +max_sessions(_Config) -> % TODO: + 8192. + %% RingSize = + %% case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of + %% undefined -> 1024; + %% Size -> Size + %% end, + %% 2 * (RingSize * erlang:system_info(schedulers)). %% @private establish_utility_cursors(Connection, Table) -> diff --git a/src/wterl.erl b/src/wterl.erl index aae5f7e..67aaa28 100644 --- a/src/wterl.erl +++ b/src/wterl.erl @@ -115,20 +115,20 @@ connection_open(HomeDir, ConnectionConfig, SessionConfig) -> -spec conn_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}. conn_open(HomeDir, ConnectionConfig, SessionConfig) -> - ?ASYNC_NIF_CALL(fun conn_open_nif/4, [HomeDir, + ?ASYNC_NIF_CALL(fun conn_open_nif/5, [HomeDir, config_to_bin(ConnectionConfig), config_to_bin(SessionConfig)]). --spec conn_open_nif(reference(), string(), config(), config()) -> {ok, connection()} | {error, term()}. -conn_open_nif(_AsyncRef, _HomeDir, _ConnectionConfig, _SessionConfig) -> +-spec conn_open_nif(reference(), non_neg_integer(), string(), config(), config()) -> {ok, connection()} | {error, term()}. +conn_open_nif(_AsyncRef, _SchedulerId, _HomeDir, _ConnectionConfig, _SessionConfig) -> ?nif_stub. -spec connection_close(connection()) -> ok | {error, term()}. connection_close(ConnRef) -> - ?ASYNC_NIF_CALL(fun conn_close_nif/2, [ConnRef]). + ?ASYNC_NIF_CALL(fun conn_close_nif/3, [ConnRef]). --spec conn_close_nif(reference(), connection()) -> ok | {error, term()}. -conn_close_nif(_AsyncRef, _ConnRef) -> +-spec conn_close_nif(reference(), non_neg_integer(), connection()) -> ok | {error, term()}. +conn_close_nif(_AsyncRef, _SchedulerId, _ConnRef) -> ?nif_stub. -spec create(connection(), string()) -> ok | {error, term()}. @@ -136,10 +136,10 @@ conn_close_nif(_AsyncRef, _ConnRef) -> create(Ref, Name) -> create(Ref, Name, []). create(Ref, Name, Config) -> - ?ASYNC_NIF_CALL(fun create_nif/4, [Ref, Name, config_to_bin(Config)]). + ?ASYNC_NIF_CALL(fun create_nif/5, [Ref, Name, config_to_bin(Config)]). --spec create_nif(reference(), connection(), string(), config()) -> ok | {error, term()}. -create_nif(_AsyncNif, _Ref, _Name, _Config) -> +-spec create_nif(reference(), non_neg_integer(), connection(), string(), config()) -> ok | {error, term()}. +create_nif(_AsyncNif, _SchedulerId, _Ref, _Name, _Config) -> ?nif_stub. -spec drop(connection(), string()) -> ok | {error, term()}. @@ -147,34 +147,34 @@ create_nif(_AsyncNif, _Ref, _Name, _Config) -> drop(Ref, Name) -> drop(Ref, Name, []). drop(Ref, Name, Config) -> - ?ASYNC_NIF_CALL(fun drop_nif/4, [Ref, Name, config_to_bin(Config)]). + ?ASYNC_NIF_CALL(fun drop_nif/5, [Ref, Name, config_to_bin(Config)]). --spec drop_nif(reference(), connection(), string(), config()) -> ok | {error, term()}. -drop_nif(_AsyncRef, _Ref, _Name, _Config) -> +-spec drop_nif(reference(), non_neg_integer(), connection(), string(), config()) -> ok | {error, term()}. +drop_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) -> ?nif_stub. -spec delete(connection(), string(), key()) -> ok | {error, term()}. delete(Ref, Table, Key) -> - ?ASYNC_NIF_CALL(fun delete_nif/4, [Ref, Table, Key]). + ?ASYNC_NIF_CALL(fun delete_nif/5, [Ref, Table, Key]). --spec delete_nif(reference(), connection(), string(), key()) -> ok | {error, term()}. -delete_nif(_AsyncRef, _Ref, _Table, _Key) -> +-spec delete_nif(reference(), non_neg_integer(), connection(), string(), key()) -> ok | {error, term()}. +delete_nif(_AsyncRef, _SchedulerId, _Ref, _Table, _Key) -> ?nif_stub. -spec get(connection(), string(), key()) -> {ok, value()} | not_found | {error, term()}. get(Ref, Table, Key) -> - ?ASYNC_NIF_CALL(fun get_nif/4, [Ref, Table, Key]). + ?ASYNC_NIF_CALL(fun get_nif/5, [Ref, Table, Key]). --spec get_nif(reference(), connection(), string(), key()) -> {ok, value()} | not_found | {error, term()}. -get_nif(_AsyncRef, _Ref, _Table, _Key) -> +-spec get_nif(reference(), non_neg_integer(), connection(), string(), key()) -> {ok, value()} | not_found | {error, term()}. +get_nif(_AsyncRef, _SchedulerId, _Ref, _Table, _Key) -> ?nif_stub. -spec put(connection(), string(), key(), value()) -> ok | {error, term()}. put(Ref, Table, Key, Value) -> - ?ASYNC_NIF_CALL(fun put_nif/5, [Ref, Table, Key, Value]). + ?ASYNC_NIF_CALL(fun put_nif/6, [Ref, Table, Key, Value]). --spec put_nif(reference(), connection(), string(), key(), value()) -> ok | {error, term()}. -put_nif(_AsyncRef, _Ref, _Table, _Key, _Value) -> +-spec put_nif(reference(), non_neg_integer(), connection(), string(), key(), value()) -> ok | {error, term()}. +put_nif(_AsyncRef, _SchedulerId, _Ref, _Table, _Key, _Value) -> ?nif_stub. -spec rename(connection(), string(), string()) -> ok | {error, term()}. @@ -182,10 +182,10 @@ put_nif(_AsyncRef, _Ref, _Table, _Key, _Value) -> rename(Ref, OldName, NewName) -> rename(Ref, OldName, NewName, []). rename(Ref, OldName, NewName, Config) -> - ?ASYNC_NIF_CALL(fun rename_nif/5, [Ref, OldName, NewName, config_to_bin(Config)]). + ?ASYNC_NIF_CALL(fun rename_nif/6, [Ref, OldName, NewName, config_to_bin(Config)]). --spec rename_nif(reference(), connection(), string(), string(), config()) -> ok | {error, term()}. -rename_nif(_AsyncRef, _Ref, _OldName, _NewName, _Config) -> +-spec rename_nif(reference(), non_neg_integer(), connection(), string(), string(), config()) -> ok | {error, term()}. +rename_nif(_AsyncRef, _SchedulerId, _Ref, _OldName, _NewName, _Config) -> ?nif_stub. -spec salvage(connection(), string()) -> ok | {error, term()}. @@ -193,10 +193,10 @@ rename_nif(_AsyncRef, _Ref, _OldName, _NewName, _Config) -> salvage(Ref, Name) -> salvage(Ref, Name, []). salvage(Ref, Name, Config) -> - ?ASYNC_NIF_CALL(fun salvage_nif/4, [Ref, Name, config_to_bin(Config)]). + ?ASYNC_NIF_CALL(fun salvage_nif/5, [Ref, Name, config_to_bin(Config)]). --spec salvage_nif(reference(), connection(), string(), config()) -> ok | {error, term()}. -salvage_nif(_AsyncRef, _Ref, _Name, _Config) -> +-spec salvage_nif(reference(), non_neg_integer(), connection(), string(), config()) -> ok | {error, term()}. +salvage_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) -> ?nif_stub. -spec checkpoint(connection()) -> ok | {error, term()}. @@ -204,10 +204,10 @@ salvage_nif(_AsyncRef, _Ref, _Name, _Config) -> checkpoint(_Ref) -> checkpoint(_Ref, []). checkpoint(Ref, Config) -> - ?ASYNC_NIF_CALL(fun checkpoint_nif/3, [Ref, config_to_bin(Config)]). + ?ASYNC_NIF_CALL(fun checkpoint_nif/4, [Ref, config_to_bin(Config)]). --spec checkpoint_nif(reference(), connection(), config()) -> ok | {error, term()}. -checkpoint_nif(_AsyncRef, _Ref, _Config) -> +-spec checkpoint_nif(reference(), non_neg_integer(), connection(), config()) -> ok | {error, term()}. +checkpoint_nif(_AsyncRef, _SchedulerId, _Ref, _Config) -> ?nif_stub. -spec truncate(connection(), string()) -> ok | {error, term()}. @@ -221,10 +221,10 @@ truncate(Ref, Name, Config) -> truncate(Ref, Name, Start, Stop) -> truncate(Ref, Name, Start, Stop, []). truncate(Ref, Name, Start, Stop, Config) -> - ?ASYNC_NIF_CALL(fun truncate_nif/6, [Ref, Name, Start, Stop, config_to_bin(Config)]). + ?ASYNC_NIF_CALL(fun truncate_nif/7, [Ref, Name, Start, Stop, config_to_bin(Config)]). --spec truncate_nif(reference(), connection(), string(), cursor() | first, cursor() | last, config()) -> ok | {error, term()}. -truncate_nif(_AsyncRef, _Ref, _Name, _Start, _Stop, _Config) -> +-spec truncate_nif(reference(), non_neg_integer(), connection(), string(), cursor() | first, cursor() | last, config()) -> ok | {error, term()}. +truncate_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Start, _Stop, _Config) -> ?nif_stub. -spec upgrade(connection(), string()) -> ok | {error, term()}. @@ -232,10 +232,10 @@ truncate_nif(_AsyncRef, _Ref, _Name, _Start, _Stop, _Config) -> upgrade(Ref, Name) -> upgrade(Ref, Name, []). upgrade(Ref, Name, Config) -> - ?ASYNC_NIF_CALL(fun upgrade_nif/4, [Ref, Name, config_to_bin(Config)]). + ?ASYNC_NIF_CALL(fun upgrade_nif/5, [Ref, Name, config_to_bin(Config)]). --spec upgrade_nif(reference(), connection(), string(), config()) -> ok | {error, term()}. -upgrade_nif(_AsyncRef, _Ref, _Name, _Config) -> +-spec upgrade_nif(reference(), non_neg_integer(), connection(), string(), config()) -> ok | {error, term()}. +upgrade_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) -> ?nif_stub. -spec verify(connection(), string()) -> ok | {error, term()}. @@ -243,10 +243,10 @@ upgrade_nif(_AsyncRef, _Ref, _Name, _Config) -> verify(Ref, Name) -> verify(Ref, Name, []). verify(Ref, Name, Config) -> - ?ASYNC_NIF_CALL(fun verify_nif/4, [Ref, Name, config_to_bin(Config)]). + ?ASYNC_NIF_CALL(fun verify_nif/5, [Ref, Name, config_to_bin(Config)]). --spec verify_nif(reference(), connection(), string(), config()) -> ok | {error, term()}. -verify_nif(_AsyncRef, _Ref, _Name, _Config) -> +-spec verify_nif(reference(), non_neg_integer(), connection(), string(), config()) -> ok | {error, term()}. +verify_nif(_AsyncRef, _SchedulerId, _Ref, _Name, _Config) -> ?nif_stub. -spec cursor_open(connection(), string()) -> {ok, cursor()} | {error, term()}. @@ -254,114 +254,114 @@ verify_nif(_AsyncRef, _Ref, _Name, _Config) -> cursor_open(Ref, Table) -> cursor_open(Ref, Table, []). cursor_open(Ref, Table, Config) -> - ?ASYNC_NIF_CALL(fun cursor_open_nif/4, [Ref, Table, config_to_bin(Config)]). + ?ASYNC_NIF_CALL(fun cursor_open_nif/5, [Ref, Table, config_to_bin(Config)]). --spec cursor_open_nif(reference(), connection(), string(), config()) -> {ok, cursor()} | {error, term()}. -cursor_open_nif(_AsyncRef, _Ref, _Table, _Config) -> +-spec cursor_open_nif(reference(), non_neg_integer(), connection(), string(), config()) -> {ok, cursor()} | {error, term()}. +cursor_open_nif(_AsyncRef, _SchedulerId, _Ref, _Table, _Config) -> ?nif_stub. -spec cursor_close(cursor()) -> ok | {error, term()}. cursor_close(Cursor) -> - ?ASYNC_NIF_CALL(fun cursor_close_nif/2, [Cursor]). + ?ASYNC_NIF_CALL(fun cursor_close_nif/3, [Cursor]). --spec cursor_close_nif(reference(), cursor()) -> ok | {error, term()}. -cursor_close_nif(_AsyncRef, _Cursor) -> +-spec cursor_close_nif(reference(), non_neg_integer(), cursor()) -> ok | {error, term()}. +cursor_close_nif(_AsyncRef, _SchedulerId, _Cursor) -> ?nif_stub. -spec cursor_next(cursor()) -> {ok, key(), value()} | not_found | {error, term()}. cursor_next(Cursor) -> - ?ASYNC_NIF_CALL(fun cursor_next_nif/2, [Cursor]). + ?ASYNC_NIF_CALL(fun cursor_next_nif/3, [Cursor]). --spec cursor_next_nif(reference(), cursor()) -> {ok, key(), value()} | not_found | {error, term()}. -cursor_next_nif(_AsyncRef, _Cursor) -> +-spec cursor_next_nif(reference(), non_neg_integer(), cursor()) -> {ok, key(), value()} | not_found | {error, term()}. +cursor_next_nif(_AsyncRef, _SchedulerId, _Cursor) -> ?nif_stub. -spec cursor_next_key(cursor()) -> {ok, key()} | not_found | {error, term()}. cursor_next_key(Cursor) -> - ?ASYNC_NIF_CALL(fun cursor_next_key_nif/2, [Cursor]). + ?ASYNC_NIF_CALL(fun cursor_next_key_nif/3, [Cursor]). --spec cursor_next_key_nif(reference(), cursor()) -> {ok, key()} | not_found | {error, term()}. -cursor_next_key_nif(_AsyncRef, _Cursor) -> +-spec cursor_next_key_nif(reference(), non_neg_integer(), cursor()) -> {ok, key()} | not_found | {error, term()}. +cursor_next_key_nif(_AsyncRef, _SchedulerId, _Cursor) -> ?nif_stub. -spec cursor_next_value(cursor()) -> {ok, value()} | not_found | {error, term()}. cursor_next_value(Cursor) -> - ?ASYNC_NIF_CALL(fun cursor_next_value_nif/2, [Cursor]). + ?ASYNC_NIF_CALL(fun cursor_next_value_nif/3, [Cursor]). --spec cursor_next_value_nif(reference(), cursor()) -> {ok, value()} | not_found | {error, term()}. -cursor_next_value_nif(_AsyncRef, _Cursor) -> +-spec cursor_next_value_nif(reference(), non_neg_integer(), cursor()) -> {ok, value()} | not_found | {error, term()}. +cursor_next_value_nif(_AsyncRef, _SchedulerId, _Cursor) -> ?nif_stub. -spec cursor_prev(cursor()) -> {ok, key(), value()} | not_found | {error, term()}. cursor_prev(Cursor) -> - ?ASYNC_NIF_CALL(fun cursor_prev_nif/2, [Cursor]). + ?ASYNC_NIF_CALL(fun cursor_prev_nif/3, [Cursor]). --spec cursor_prev_nif(reference(), cursor()) -> {ok, key(), value()} | not_found | {error, term()}. -cursor_prev_nif(_AsyncRef, _Cursor) -> +-spec cursor_prev_nif(reference(), non_neg_integer(), cursor()) -> {ok, key(), value()} | not_found | {error, term()}. +cursor_prev_nif(_AsyncRef, _SchedulerId, _Cursor) -> ?nif_stub. -spec cursor_prev_key(cursor()) -> {ok, key()} | not_found | {error, term()}. cursor_prev_key(Cursor) -> - ?ASYNC_NIF_CALL(fun cursor_prev_key_nif/2, [Cursor]). + ?ASYNC_NIF_CALL(fun cursor_prev_key_nif/3, [Cursor]). --spec cursor_prev_key_nif(reference(), cursor()) -> {ok, key()} | not_found | {error, term()}. -cursor_prev_key_nif(_AsyncRef, _Cursor) -> +-spec cursor_prev_key_nif(reference(), non_neg_integer(), cursor()) -> {ok, key()} | not_found | {error, term()}. +cursor_prev_key_nif(_AsyncRef, _SchedulerId, _Cursor) -> ?nif_stub. -spec cursor_prev_value(cursor()) -> {ok, value()} | not_found | {error, term()}. cursor_prev_value(Cursor) -> - ?ASYNC_NIF_CALL(fun cursor_prev_value_nif/2, [Cursor]). + ?ASYNC_NIF_CALL(fun cursor_prev_value_nif/3, [Cursor]). --spec cursor_prev_value_nif(reference(), cursor()) -> {ok, value()} | not_found | {error, term()}. -cursor_prev_value_nif(_AsyncRef, _Cursor) -> +-spec cursor_prev_value_nif(reference(), non_neg_integer(), cursor()) -> {ok, value()} | not_found | {error, term()}. +cursor_prev_value_nif(_AsyncRef, _SchedulerId, _Cursor) -> ?nif_stub. -spec cursor_search(cursor(), key()) -> {ok, value()} | {error, term()}. cursor_search(Cursor, Key) -> - ?ASYNC_NIF_CALL(fun cursor_search_nif/3, [Cursor, Key]). + ?ASYNC_NIF_CALL(fun cursor_search_nif/4, [Cursor, Key]). --spec cursor_search_nif(reference(), cursor(), key()) -> {ok, value()} | {error, term()}. -cursor_search_nif(_AsyncRef, _Cursor, _Key) -> +-spec cursor_search_nif(reference(), non_neg_integer(), cursor(), key()) -> {ok, value()} | {error, term()}. +cursor_search_nif(_AsyncRef, _SchedulerId, _Cursor, _Key) -> ?nif_stub. -spec cursor_search_near(cursor(), key()) -> {ok, value()} | {error, term()}. cursor_search_near(Cursor, Key) -> - ?ASYNC_NIF_CALL(fun cursor_search_near_nif/3, [Cursor, Key]). + ?ASYNC_NIF_CALL(fun cursor_search_near_nif/4, [Cursor, Key]). --spec cursor_search_near_nif(reference(), cursor(), key()) -> {ok, value()} | {error, term()}. -cursor_search_near_nif(_AsyncRef, _Cursor, _Key) -> +-spec cursor_search_near_nif(reference(), non_neg_integer(), cursor(), key()) -> {ok, value()} | {error, term()}. +cursor_search_near_nif(_AsyncRef, _SchedulerId, _Cursor, _Key) -> ?nif_stub. -spec cursor_reset(cursor()) -> ok | {error, term()}. cursor_reset(Cursor) -> - ?ASYNC_NIF_CALL(fun cursor_reset_nif/2, [Cursor]). + ?ASYNC_NIF_CALL(fun cursor_reset_nif/3, [Cursor]). --spec cursor_reset_nif(reference(), cursor()) -> ok | {error, term()}. -cursor_reset_nif(_AsyncRef, _Cursor) -> +-spec cursor_reset_nif(reference(), non_neg_integer(), cursor()) -> ok | {error, term()}. +cursor_reset_nif(_AsyncRef, _SchedulerId, _Cursor) -> ?nif_stub. -spec cursor_insert(cursor(), key(), value()) -> ok | {error, term()}. cursor_insert(Cursor, Key, Value) -> - ?ASYNC_NIF_CALL(fun cursor_insert_nif/4, [Cursor, Key, Value]). + ?ASYNC_NIF_CALL(fun cursor_insert_nif/5, [Cursor, Key, Value]). --spec cursor_insert_nif(reference(), cursor(), key(), value()) -> ok | {error, term()}. -cursor_insert_nif(_AsyncRef, _Cursor, _Key, _Value) -> +-spec cursor_insert_nif(reference(), non_neg_integer(), cursor(), key(), value()) -> ok | {error, term()}. +cursor_insert_nif(_AsyncRef, _SchedulerId, _Cursor, _Key, _Value) -> ?nif_stub. -spec cursor_update(cursor(), key(), value()) -> ok | {error, term()}. cursor_update(Cursor, Key, Value) -> - ?ASYNC_NIF_CALL(fun cursor_update_nif/4, [Cursor, Key, Value]). + ?ASYNC_NIF_CALL(fun cursor_update_nif/5, [Cursor, Key, Value]). --spec cursor_update_nif(reference(), cursor(), key(), value()) -> ok | {error, term()}. -cursor_update_nif(_AsyncRef, _Cursor, _Key, _Value) -> +-spec cursor_update_nif(reference(), non_neg_integer(), cursor(), key(), value()) -> ok | {error, term()}. +cursor_update_nif(_AsyncRef, _SchedulerId, _Cursor, _Key, _Value) -> ?nif_stub. -spec cursor_remove(cursor(), key()) -> ok | {error, term()}. cursor_remove(Cursor, Key) -> - ?ASYNC_NIF_CALL(fun cursor_remove_nif/3, [Cursor, Key]). + ?ASYNC_NIF_CALL(fun cursor_remove_nif/4, [Cursor, Key]). --spec cursor_remove_nif(reference(), cursor(), key()) -> ok | {error, term()}. -cursor_remove_nif(_AsyncRef, _Cursor, _Key) -> +-spec cursor_remove_nif(reference(), non_neg_integer(), cursor(), key()) -> ok | {error, term()}. +cursor_remove_nif(_AsyncRef, _SchedulerId, _Cursor, _Key) -> ?nif_stub. -type fold_keys_fun() :: fun((Key::binary(), any()) -> any()).