diff --git a/c_src/async_nif.h b/c_src/async_nif.h index 026a38f..4fdad10 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -131,6 +131,7 @@ struct async_nif_state { struct decl ## _args *args = &on_stack_args; \ struct decl ## _args *copy_of_args; \ struct async_nif_req_entry *req; \ + const char *affinity = NULL; \ ErlNifEnv *new_env = NULL; \ /* argv[0] is a ref used for selective recv */ \ const ERL_NIF_TERM *argv = argv_in + 1; \ @@ -164,8 +165,11 @@ struct async_nif_state { enif_self(env, &req->pid); \ req->args = (void*)copy_of_args; \ req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \ - req->fn_post = (void (*)(void *))fn_post_ ## decl; \ - return async_nif_enqueue_req(async_nif, req); \ + req->fn_post = (void (*)(void *))fn_post_ ## decl; \ + unsigned int h = 0; \ + if (affinity) \ + h = async_nif_str_hash_func(affinity) % async_nif->num_queues; \ + return async_nif_enqueue_req(async_nif, req, h); \ } #define ASYNC_NIF_INIT(name) \ @@ -201,9 +205,20 @@ struct async_nif_state { #define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg)) /** + * TODO: + */ +static inline unsigned int async_nif_str_hash_func(const char *s) +{ + unsigned int h = (unsigned int)*s; + if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s; + return h; +} + +/** + * TODO: */ static ERL_NIF_TERM -async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req) +async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, unsigned int hint) { /* If we're shutting down return an error term and ignore the request. */ if (async_nif->shutdown) { @@ -215,7 +230,7 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en return reply; } - unsigned int qid = async_nif->next_q; // Keep a local to avoid the race. + unsigned int qid = hint ? hint : async_nif->next_q; // Keep a local to avoid the race. struct async_nif_work_queue *q = &async_nif->queues[qid]; while (fifo_q_size(reqs, q->reqs) == fifo_q_capacity(reqs, q->reqs)) { qid = (qid + 1) % async_nif->num_queues; diff --git a/c_src/wterl.c b/c_src/wterl.c index 7e93255..253359e 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -700,6 +700,7 @@ ASYNC_NIF_DECL( } args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[4]); enif_keep_resource((void*)args->conn_handle); + affinity = args->uri; }, { // work @@ -979,6 +980,7 @@ ASYNC_NIF_DECL( } args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); enif_keep_resource((void*)args->conn_handle); + affinity = args->uri; }, { // work @@ -1040,6 +1042,7 @@ ASYNC_NIF_DECL( } args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); enif_keep_resource((void*)args->conn_handle); + affinity = args->uri; }, { // work @@ -1119,6 +1122,7 @@ ASYNC_NIF_DECL( args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); args->value = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[3]); enif_keep_resource((void*)args->conn_handle); + affinity = args->uri; }, { // work @@ -1189,6 +1193,7 @@ ASYNC_NIF_DECL( } args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); enif_keep_resource((void*)args->conn_handle); + affinity = args->uri; }, { // work