diff --git a/c_src/async_nif.h b/c_src/async_nif.h index 8f13913..87bd3a6 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -34,9 +34,18 @@ extern "C" { #define ASYNC_NIF_MAX_WORKERS 1024 #define ASYNC_NIF_MIN_WORKERS 2 -#define ASYNC_NIF_WORKER_QUEUE_SIZE 100 +#define ASYNC_NIF_WORKER_QUEUE_SIZE 8192 #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS +/* Atoms (initialized in on_load) */ +static ERL_NIF_TERM ATOM_EAGAIN; +static ERL_NIF_TERM ATOM_ENOMEM; +static ERL_NIF_TERM ATOM_ENQUEUED; +static ERL_NIF_TERM ATOM_ERROR; +static ERL_NIF_TERM ATOM_OK; +static ERL_NIF_TERM ATOM_SHUTDOWN; + + struct async_nif_req_entry { ERL_NIF_TERM ref; ErlNifEnv *env; @@ -53,6 +62,7 @@ struct async_nif_work_queue { unsigned int depth; ErlNifMutex *reqs_mutex; ErlNifCond *reqs_cnd; + struct async_nif_work_queue *next; STAILQ_HEAD(reqs, async_nif_req_entry) reqs; }; @@ -103,25 +113,20 @@ struct async_nif_state { argc -= 1; \ /* Note: !!! this assumes that the first element of priv_data is ours */ \ struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \ - if (async_nif->shutdown) { \ - return enif_make_tuple2(env, enif_make_atom(env, "error"), \ - enif_make_atom(env, "shutdown")); \ - } \ + if (async_nif->shutdown) \ + return enif_make_tuple2(env, ATOM_ERROR, ATOM_SHUTDOWN); \ req = async_nif_reuse_req(async_nif); \ - if (!req) { \ - return enif_make_tuple2(env, enif_make_atom(env, "error"), \ - enif_make_atom(env, "eagain")); \ - } \ + if (!req) \ + return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \ new_env = req->env; \ DPRINTF("async_nif: calling \"%s\"", __func__); \ do pre_block while(0); \ DPRINTF("async_nif: returned from \"%s\"", __func__); \ - copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \ + copy_of_args = (struct decl ## _args *)malloc(sizeof(struct decl ## _args)); \ if (!copy_of_args) { \ fn_post_ ## decl (args); \ async_nif_recycle_req(req, async_nif); \ - return enif_make_tuple2(env, enif_make_atom(env, "error"), \ - enif_make_atom(env, "enomem")); \ + return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \ } \ memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \ req->ref = enif_make_copy(new_env, argv_in[0]); \ @@ -136,9 +141,8 @@ struct async_nif_state { if (!reply) { \ fn_post_ ## decl (args); \ async_nif_recycle_req(req, async_nif); \ - enif_free(copy_of_args); \ - return enif_make_tuple2(env, enif_make_atom(env, "error"), \ - enif_make_atom(env, "eagain")); \ + free(copy_of_args); \ + return enif_make_tuple2(env, ATOM_ERROR, ATOM_EAGAIN); \ } \ return reply; \ } @@ -146,16 +150,16 @@ struct async_nif_state { #define ASYNC_NIF_INIT(name) \ static ErlNifMutex *name##_async_nif_coord = NULL; -#define ASYNC_NIF_LOAD(name, priv) do { \ +#define ASYNC_NIF_LOAD(name, env, priv) do { \ if (!name##_async_nif_coord) \ - name##_async_nif_coord = enif_mutex_create(NULL); \ + name##_async_nif_coord = enif_mutex_create("nif_coord load"); \ enif_mutex_lock(name##_async_nif_coord); \ - priv = async_nif_load(); \ + priv = async_nif_load(env); \ enif_mutex_unlock(name##_async_nif_coord); \ } while(0); #define ASYNC_NIF_UNLOAD(name, env, priv) do { \ if (!name##_async_nif_coord) \ - name##_async_nif_coord = enif_mutex_create(NULL); \ + name##_async_nif_coord = enif_mutex_create("nif_coord unload"); \ enif_mutex_lock(name##_async_nif_coord); \ async_nif_unload(env, priv); \ enif_mutex_unlock(name##_async_nif_coord); \ @@ -164,7 +168,7 @@ struct async_nif_state { } while(0); #define ASYNC_NIF_UPGRADE(name, env) do { \ if (!name##_async_nif_coord) \ - name##_async_nif_coord = enif_mutex_create(NULL); \ + name##_async_nif_coord = enif_mutex_create("nif_coord upgrade"); \ enif_mutex_lock(name##_async_nif_coord); \ async_nif_upgrade(env); \ enif_mutex_unlock(name##_async_nif_coord); \ @@ -191,15 +195,15 @@ async_nif_reuse_req(struct async_nif_state *async_nif) enif_mutex_lock(async_nif->recycled_req_mutex); if (STAILQ_EMPTY(&async_nif->recycled_reqs)) { if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) { - req = enif_alloc(sizeof(struct async_nif_req_entry)); + req = malloc(sizeof(struct async_nif_req_entry)); if (req) { memset(req, 0, sizeof(struct async_nif_req_entry)); env = enif_alloc_env(); if (env) { req->env = env; - async_nif->num_reqs++; + __sync_fetch_and_add(&async_nif->num_reqs, 1); } else { - enif_free(req); + free(req); req = NULL; } } @@ -253,7 +257,7 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_ SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries); void *exit_value = 0; /* We ignore the thread_join's exit value. */ enif_thread_join(we->tid, &exit_value); - enif_free(we); + free(we); async_nif->we_active--; we = n; } @@ -263,7 +267,7 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_ return EAGAIN; } - we = enif_alloc(sizeof(struct async_nif_worker_entry)); + we = malloc(sizeof(struct async_nif_worker_entry)); if (!we) { enif_mutex_unlock(async_nif->we_mutex); return ENOMEM; @@ -287,7 +291,7 @@ static ERL_NIF_TERM async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint) { /* Identify the most appropriate worker for this request. */ - unsigned int i, qid = 0; + unsigned int i, last_qid, qid = 0; struct async_nif_work_queue *q = NULL; double avg_depth = 0.0; @@ -297,9 +301,10 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en if (hint >= 0) { qid = (unsigned int)hint; } else { - qid = async_nif->next_q; - qid = (qid + 1) % async_nif->num_queues; - async_nif->next_q = qid; + do { + last_qid = __sync_fetch_and_add(&async_nif->next_q, 0); + qid = (last_qid + 1) % async_nif->num_queues; + } while (!__sync_bool_compare_and_swap(&async_nif->next_q, last_qid, qid)); } /* Now we inspect and interate across the set of queues trying to select one @@ -314,18 +319,13 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en avg_depth += async_nif->queues[j].depth; } } - if (avg_depth != 0) - avg_depth /= n; + if (avg_depth) avg_depth /= n; /* Lock this queue under consideration, then check for shutdown. While we hold this lock either a) we're shutting down so exit now or b) this queue will be valid until we release the lock. */ q = &async_nif->queues[qid]; enif_mutex_lock(q->reqs_mutex); - if (async_nif->shutdown) { - enif_mutex_unlock(q->reqs_mutex); - return 0; - } /* Try not to enqueue a request into a queue that isn't keeping up with the request volume. */ @@ -337,26 +337,31 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en } /* If the for loop finished then we didn't find a suitable queue for this - request, meaning we're backed up so trigger eagain. */ - if (i == async_nif->num_queues) { - enif_mutex_unlock(q->reqs_mutex); - return 0; - } + request, meaning we're backed up so trigger eagain. Note that if we left + the loop in this way we hold no lock. */ + if (i == async_nif->num_queues) return 0; /* Add the request to the queue. */ STAILQ_INSERT_TAIL(&q->reqs, req, entries); - q->depth++; + __sync_fetch_and_add(&q->depth, 1); /* We've selected a queue for this new request now check to make sure there are enough workers actively processing requests on this queue. */ - if (q->depth > q->num_workers) - if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++; + while (q->depth > q->num_workers) { + switch(async_nif_start_worker(async_nif, q)) { + case EINVAL: case ENOMEM: default: return 0; + case EAGAIN: continue; + case 0: __sync_fetch_and_add(&q->num_workers, 1); goto done; + } + }done:; /* Build the term before releasing the lock so as not to race on the use of the req pointer (which will soon become invalid in another thread performing the request). */ - ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"), - enif_make_atom(req->env, "enqueued")); + double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE; + ERL_NIF_TERM reply = enif_make_tuple2(req->env, ATOM_OK, + enif_make_tuple2(req->env, ATOM_ENQUEUED, + enif_make_double(req->env, pct_full))); enif_cond_signal(q->reqs_cnd); enif_mutex_unlock(q->reqs_mutex); return reply; @@ -375,6 +380,7 @@ async_nif_worker_fn(void *arg) struct async_nif_state *async_nif = we->async_nif; struct async_nif_work_queue *q = we->q; struct async_nif_req_entry *req = NULL; + unsigned int tries = async_nif->num_queues; for(;;) { /* Examine the request queue, are there things to be done? */ @@ -386,22 +392,33 @@ async_nif_worker_fn(void *arg) } if (STAILQ_EMPTY(&q->reqs)) { /* Queue is empty so we wait for more work to arrive. */ - if (q->num_workers > ASYNC_NIF_MIN_WORKERS) { - enif_mutex_unlock(q->reqs_mutex); - break; - } else { - enif_cond_wait(q->reqs_cnd, q->reqs_mutex); - goto check_again_for_work; - } + enif_mutex_unlock(q->reqs_mutex); + if (tries == 0 && q == we->q) { + if (q->num_workers > ASYNC_NIF_MIN_WORKERS) { + /* At this point we've tried to find/execute work on all queues + * and there are at least MIN_WORKERS on this queue so we + * leaving this loop (break) which leads to a thread exit/join. */ + break; + } else { + enif_mutex_lock(q->reqs_mutex); + enif_cond_wait(q->reqs_cnd, q->reqs_mutex); + goto check_again_for_work; + } + } else { + tries--; + __sync_fetch_and_add(&q->num_workers, -1); + q = q->next; + __sync_fetch_and_add(&q->num_workers, 1); + continue; // try next queue + } } else { /* At this point the next req is ours to process and we hold the reqs_mutex lock. Take the request off the queue. */ req = STAILQ_FIRST(&q->reqs); STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries); - q->depth--; + __sync_fetch_and_add(&q->depth, -1); - /* Ensure that there is at least one other worker thread watching this - queue. */ + /* Wake up other worker thread watching this queue to help process work. */ enif_cond_signal(q->reqs_cnd); enif_mutex_unlock(q->reqs_mutex); @@ -415,7 +432,7 @@ async_nif_worker_fn(void *arg) req->ref = 0; req->fn_work = 0; req->fn_post = 0; - enif_free(req->args); + free(req->args); req->args = NULL; async_nif_recycle_req(req, async_nif); req = NULL; @@ -424,7 +441,7 @@ async_nif_worker_fn(void *arg) enif_mutex_lock(async_nif->we_mutex); SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries); enif_mutex_unlock(async_nif->we_mutex); - q->num_workers--; + __sync_fetch_and_add(&q->num_workers, -1); enif_thread_exit(0); return 0; } @@ -467,7 +484,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries); void *exit_value = 0; /* We ignore the thread_join's exit value. */ enif_thread_join(we->tid, &exit_value); - enif_free(we); + free(we); async_nif->we_active--; we = n; } @@ -486,12 +503,11 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) struct async_nif_req_entry *n = STAILQ_NEXT(req, entries); enif_clear_env(req->env); enif_send(NULL, &req->pid, req->env, - enif_make_tuple2(req->env, enif_make_atom(req->env, "error"), - enif_make_atom(req->env, "shutdown"))); + enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN)); req->fn_post(req->args); enif_free_env(req->env); - enif_free(req->args); - enif_free(req); + free(req->args); + free(req); req = n; } enif_mutex_destroy(q->reqs_mutex); @@ -505,18 +521,18 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) while(req != NULL) { struct async_nif_req_entry *n = STAILQ_NEXT(req, entries); enif_free_env(req->env); - enif_free(req); + free(req); req = n; } enif_mutex_unlock(async_nif->recycled_req_mutex); enif_mutex_destroy(async_nif->recycled_req_mutex); memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues)); - enif_free(async_nif); + free(async_nif); } static void * -async_nif_load() +async_nif_load(ErlNifEnv *env) { static int has_init = 0; unsigned int i, num_queues; @@ -527,6 +543,14 @@ async_nif_load() if (has_init) return 0; else has_init = 1; + /* Init some static references to commonly used atoms. */ + ATOM_EAGAIN = enif_make_atom(env, "eagain"); + ATOM_ENOMEM = enif_make_atom(env, "enomem"); + ATOM_ENQUEUED = enif_make_atom(env, "enqueued"); + ATOM_ERROR = enif_make_atom(env, "error"); + ATOM_OK = enif_make_atom(env, "ok"); + ATOM_SHUTDOWN = enif_make_atom(env, "shutdown"); + /* Find out how many schedulers there are. */ enif_system_info(&info, sizeof(ErlNifSysInfo)); @@ -544,8 +568,8 @@ async_nif_load() } /* Init our portion of priv_data's module-specific state. */ - async_nif = enif_alloc(sizeof(struct async_nif_state) + - sizeof(struct async_nif_work_queue) * num_queues); + async_nif = malloc(sizeof(struct async_nif_state) + + sizeof(struct async_nif_work_queue) * num_queues); if (!async_nif) return NULL; memset(async_nif, 0, sizeof(struct async_nif_state) + @@ -556,15 +580,16 @@ async_nif_load() async_nif->next_q = 0; async_nif->shutdown = 0; STAILQ_INIT(&async_nif->recycled_reqs); - async_nif->recycled_req_mutex = enif_mutex_create(NULL); - async_nif->we_mutex = enif_mutex_create(NULL); + async_nif->recycled_req_mutex = enif_mutex_create("recycled_req"); + async_nif->we_mutex = enif_mutex_create("we"); SLIST_INIT(&async_nif->we_joining); for (i = 0; i < async_nif->num_queues; i++) { struct async_nif_work_queue *q = &async_nif->queues[i]; STAILQ_INIT(&q->reqs); - q->reqs_mutex = enif_mutex_create(NULL); - q->reqs_cnd = enif_cond_create(NULL); + q->reqs_mutex = enif_mutex_create("reqs"); + q->reqs_cnd = enif_cond_create("reqs"); + q->next = &async_nif->queues[(i + 1) % num_queues]; } return async_nif; } diff --git a/c_src/lmdb_nif.c b/c_src/lmdb_nif.c index a24afe3..38feeea 100644 --- a/c_src/lmdb_nif.c +++ b/c_src/lmdb_nif.c @@ -615,7 +615,7 @@ static int lmdb_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) /* Note: !!! the first element of our priv_data struct *must* be the pointer to the async_nif's private data which we set here. */ - ASYNC_NIF_LOAD(lmdb, priv->async_nif_priv); + ASYNC_NIF_LOAD(lmdb, env, priv->async_nif_priv); if (!priv) return ENOMEM; *priv_data = priv; diff --git a/src/async_nif.hrl b/src/async_nif.hrl index de4f592..5492e97 100644 --- a/src/async_nif.hrl +++ b/src/async_nif.hrl @@ -1,43 +1,54 @@ -%% --------------------------------------------------------------------------- +%% ------------------------------------------------------------------- %% %% async_nif: An async thread-pool layer for Erlang's NIF API %% -%% Copyright (c) 2012-2013 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. %% Author: Gregory Burd %% -%% This file is provided to you under the Apache License, Version 2.0 (the -%% "License"); you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at: +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -%% License for the specific language governing permissions and limitations +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations %% under the License. %% -%% --------------------------------------------------------------------------- +%% ------------------------------------------------------------------- --spec async_nif_enqueue(reference(), function(), [term()]) -> term() | {error, term()}. -async_nif_enqueue(R, F, A) -> - case erlang:apply(F, [R|A]) of - {ok, enqueued} -> - receive - {R, {error, eagain}} -> - %% Work unit was not queued, try again. - async_nif_enqueue(R, F, A); - {R, {error, shutdown}=Error} -> - %% Work unit was queued, but not executed. - Error; - {R, {error, _Reason}=Error} -> - %% Work unit returned an error. - Error; - {R, Reply} -> - Reply - end; - Other -> - Other - end. - --define(ASYNC_NIF_CALL(Fun, Args), async_nif_enqueue(erlang:make_ref(), Fun, Args)). +-define(ASYNC_NIF_CALL(Fun, Args), + F = fun(F, T) -> + R = erlang:make_ref(), + case erlang:apply(Fun, [R|Args]) of + {ok, {enqueued, PctBusy}} -> + if + PctBusy > 0.25 andalso PctBusy =< 1.0 -> + erlang:bump_reductions(erlang:trunc(2000 * PctBusy)); + true -> + ok + end, + receive + {R, {error, shutdown}=Error} -> + %% Work unit was queued, but not executed. + Error; + {R, {error, _Reason}=Error} -> + %% Work unit returned an error. + Error; + {R, Reply} -> + Reply + end; + {error, eagain} -> + case T of + 3 -> not_found; + _ -> F(F, T + 1) + end; + Other -> + Other + end + end, + F(F, 1)).