/* * async_nif: An async thread-pool layer for Erlang's NIF API * * Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. * Author: Gregory Burd * * This file is provided to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */ #ifndef __ASYNC_NIF_H__ #define __ASYNC_NIF_H__ #if defined(__cplusplus) extern "C" { #endif #include #include "queue.h" #include "stats.h" #ifndef UNUSED #define UNUSED(v) ((void)(v)) #endif #define ASYNC_NIF_MAX_WORKERS 1024 #define ASYNC_NIF_WORKER_QUEUE_SIZE 1000 #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS struct async_nif_req_entry { ERL_NIF_TERM ref; ErlNifEnv *env; ErlNifPid pid; void *args; void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *); void (*fn_post)(void *); STAILQ_ENTRY(async_nif_req_entry) entries; }; struct async_nif_work_queue { unsigned int num_workers; ErlNifMutex *reqs_mutex; ErlNifCond *reqs_cnd; STAILQ_HEAD(reqs, async_nif_req_entry) reqs; STAT_DEF(qwait); STAT_DEF(wt); }; struct async_nif_worker_entry { ErlNifTid tid; unsigned int worker_id; struct async_nif_state *async_nif; struct async_nif_work_queue *q; SLIST_ENTRY(async_nif_worker_entry) entries; }; struct async_nif_state { unsigned int shutdown; ErlNifMutex *we_mutex; unsigned int we_active; SLIST_HEAD(joining, async_nif_worker_entry) we_joining; unsigned int num_queues; unsigned int next_q; STAILQ_HEAD(recycled_reqs, async_nif_req_entry) recycled_reqs; unsigned int num_reqs; ErlNifMutex *recycled_req_mutex; struct async_nif_work_queue queues[]; }; #define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \ struct decl ## _args frame; \ static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) { \ UNUSED(worker_id); \ DPRINTF("async_nif: calling \"%s\"", __func__); \ do work_block while(0); \ DPRINTF("async_nif: returned from \"%s\"", __func__); \ } \ static void fn_post_ ## decl (struct decl ## _args *args) { \ UNUSED(args); \ DPRINTF("async_nif: calling \"fn_post_%s\"", #decl); \ do post_block while(0); \ DPRINTF("async_nif: returned from \"fn_post_%s\"", #decl); \ } \ static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \ struct decl ## _args on_stack_args; \ struct decl ## _args *args = &on_stack_args; \ struct decl ## _args *copy_of_args; \ struct async_nif_req_entry *req = NULL; \ unsigned int affinity = 0; \ ErlNifEnv *new_env = NULL; \ /* argv[0] is a ref used for selective recv */ \ const ERL_NIF_TERM *argv = argv_in + 1; \ argc -= 1; \ /* Note: !!! this assumes that the first element of priv_data is ours */ \ struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \ if (async_nif->shutdown) \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "shutdown")); \ req = async_nif_reuse_req(async_nif); \ if (!req) { \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "eagain")); \ } \ new_env = req->env; \ DPRINTF("async_nif: calling \"%s\"", __func__); \ do pre_block while(0); \ DPRINTF("async_nif: returned from \"%s\"", __func__); \ copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \ if (!copy_of_args) { \ fn_post_ ## decl (args); \ async_nif_recycle_req(req, async_nif); \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "enomem")); \ } \ memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \ req->ref = enif_make_copy(new_env, argv_in[0]); \ enif_self(env, &req->pid); \ req->args = (void*)copy_of_args; \ req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \ req->fn_post = (void (*)(void *))fn_post_ ## decl; \ int h = -1; \ if (affinity) \ h = ((unsigned int)affinity) % async_nif->num_queues; \ ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \ if (!reply) { \ fn_post_ ## decl (args); \ async_nif_recycle_req(req, async_nif); \ enif_free(copy_of_args); \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "eagain")); \ } \ return reply; \ } #define ASYNC_NIF_INIT(name) \ static ErlNifMutex *name##_async_nif_coord = NULL; #define ASYNC_NIF_LOAD(name, priv) do { \ if (!name##_async_nif_coord) \ name##_async_nif_coord = enif_mutex_create(NULL); \ enif_mutex_lock(name##_async_nif_coord); \ priv = async_nif_load(); \ enif_mutex_unlock(name##_async_nif_coord); \ } while(0); #define ASYNC_NIF_UNLOAD(name, env, priv) do { \ if (!name##_async_nif_coord) \ name##_async_nif_coord = enif_mutex_create(NULL); \ enif_mutex_lock(name##_async_nif_coord); \ async_nif_unload(env, priv); \ enif_mutex_unlock(name##_async_nif_coord); \ enif_mutex_destroy(name##_async_nif_coord); \ name##_async_nif_coord = NULL; \ } while(0); #define ASYNC_NIF_UPGRADE(name, env) do { \ if (!name##_async_nif_coord) \ name##_async_nif_coord = enif_mutex_create(NULL); \ enif_mutex_lock(name##_async_nif_coord); \ async_nif_upgrade(env); \ enif_mutex_unlock(name##_async_nif_coord); \ } while(0); #define ASYNC_NIF_RETURN_BADARG() do { \ async_nif_recycle_req(req, async_nif); \ return enif_make_badarg(env); \ } while(0); #define ASYNC_NIF_WORK_ENV new_env #define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg)) /** * Return a request structure from the recycled req queue if one exists, * otherwise create one. */ struct async_nif_req_entry * async_nif_reuse_req(struct async_nif_state *async_nif) { struct async_nif_req_entry *req = NULL; ErlNifEnv *env = NULL; enif_mutex_lock(async_nif->recycled_req_mutex); if (STAILQ_EMPTY(&async_nif->recycled_reqs)) { if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) { req = enif_alloc(sizeof(struct async_nif_req_entry)); if (req) { memset(req, 0, sizeof(struct async_nif_req_entry)); env = enif_alloc_env(); if (env) { req->env = env; async_nif->num_reqs++; } else { enif_free(req); req = NULL; } } } } else { req = STAILQ_FIRST(&async_nif->recycled_reqs); STAILQ_REMOVE(&async_nif->recycled_reqs, req, async_nif_req_entry, entries); } enif_mutex_unlock(async_nif->recycled_req_mutex); return req; } /** * Store the request for future re-use. * * req a request entry with an ErlNifEnv* which will be cleared * before reuse, but not until then. * async_nif a handle to our state so that we can find and use the mutex */ void async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif) { ErlNifEnv *env = NULL; enif_mutex_lock(async_nif->recycled_req_mutex); enif_clear_env(req->env); env = req->env; memset(req, 0, sizeof(struct async_nif_req_entry)); req->env = env; STAILQ_INSERT_TAIL(&async_nif->recycled_reqs, req, entries); enif_mutex_unlock(async_nif->recycled_req_mutex); } static void *async_nif_worker_fn(void *); /** * Start up a worker thread. */ static int async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_queue *q) { struct async_nif_worker_entry *we; if (0 == q) return EINVAL; enif_mutex_lock(async_nif->we_mutex); #if 0 // TODO: we = SLIST_FIRST(&async_nif->we_joining); while(we != NULL) { struct async_nif_worker_entry *n = SLIST_NEXT(we, entries); SLIST_REMOVE_HEAD(&async_nif->we_joining, entries); void *exit_value = 0; /* We ignore the thread_join's exit value. */ enif_thread_join(we->tid, &exit_value); enif_free(we); async_nif->we_active--; we = n; } #endif if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) { enif_mutex_unlock(async_nif->we_mutex); return EAGAIN; } we = enif_alloc(sizeof(struct async_nif_worker_entry)); if (!we) { enif_mutex_unlock(async_nif->we_mutex); return ENOMEM; } memset(we, 0, sizeof(struct async_nif_worker_entry)); we->worker_id = async_nif->we_active++; we->async_nif = async_nif; we->q = q; enif_mutex_unlock(async_nif->we_mutex); return enif_thread_create(NULL,&we->tid, &async_nif_worker_fn, (void*)we, 0); } /** * Enqueue a request for processing by a worker thread. * * Places the request into a work queue determined either by the * provided affinity or by iterating through the available queues. */ static ERL_NIF_TERM async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint) { /* Identify the most appropriate worker for this request. */ unsigned int i, qid = 0; unsigned int n = async_nif->num_queues; struct async_nif_work_queue *q = NULL; double avg_wait_across_q, avg_wt_service_time, avg_wait_this_q = 0; /* Either we're choosing a queue based on some affinity/hinted value or we need to select the next queue in the rotation and atomically update that global value (next_q is shared across worker threads) . */ if (hint >= 0) { qid = (unsigned int)hint; } else { qid = async_nif->next_q; qid = (qid + 1) % async_nif->num_queues; async_nif->next_q = qid; } avg_wait_across_q = 0; for (i = 0; i < async_nif->num_queues; i++) { avg_wait_across_q += __stat_mean(async_nif->queues[i].qwait_stat); } if (avg_wait_across_q != 0) avg_wait_across_q /= async_nif->num_queues; /* Now we inspect and interate across the set of queues trying to select one that isn't too full or too slow. */ do { q = &async_nif->queues[qid]; enif_mutex_lock(q->reqs_mutex); /* Now that we hold the lock, check for shutdown. As long as we hold this lock either a) we're shutting down so exit now or b) this queue will be valid until we release the lock. */ if (async_nif->shutdown) { enif_mutex_unlock(q->reqs_mutex); return 0; } avg_wait_this_q = __stat_mean(q->qwait_stat); avg_wt_service_time = __stat_mean(q->wt_stat); DPRINTF("q:%d w:%u %f/%f(%f) %c", qid, q->num_workers, avg_wait_this_q, avg_wait_across_q, avg_wt_service_time, avg_wait_this_q <= avg_wait_across_q ? 't' : 'f'); if (avg_wait_this_q <= avg_wait_across_q) break; else { enif_mutex_unlock(q->reqs_mutex); qid = (qid + 1) % async_nif->num_queues; q = &async_nif->queues[qid]; } } while(n-- > 0); if (n == 0) return 0; // All queues are full, trigger eagain /* We hold the queue's lock, and we've seletect a reasonable queue for this new request now check to make sure there are enough workers actively processing requests on this queue. */ if (q->num_workers == 0 || avg_wait_this_q >= avg_wt_service_time) { if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++; } /* And finally add the request to the queue. */ __stat_tick(q->qwait_stat); STAILQ_INSERT_TAIL(&q->reqs, req, entries); /* Build the term before releasing the lock so as not to race on the use of the req pointer (which will soon become invalid in another thread performing the request). */ ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"), enif_make_atom(req->env, "enqueued")); enif_cond_signal(q->reqs_cnd); enif_mutex_unlock(q->reqs_mutex); return reply; } /** * Worker threads execute this function. Here each worker pulls requests of * their respective queues, executes that work and continues doing that until * they see the shutdown flag is set at which point they exit. */ static void * async_nif_worker_fn(void *arg) { struct async_nif_worker_entry *we = (struct async_nif_worker_entry *)arg; unsigned int worker_id = we->worker_id; struct async_nif_state *async_nif = we->async_nif; struct async_nif_work_queue *q = we->q; struct async_nif_req_entry *req = NULL; for(;;) { /* Examine the request queue, are there things to be done? */ enif_mutex_lock(q->reqs_mutex); check_again_for_work: if (async_nif->shutdown) { enif_mutex_unlock(q->reqs_mutex); break; } if (STAILQ_EMPTY(&q->reqs)) { /* Queue is empty so we wait for more work to arrive. */ if (q->num_workers > 2) { enif_mutex_unlock(q->reqs_mutex); break; } else { enif_cond_wait(q->reqs_cnd, q->reqs_mutex); goto check_again_for_work; } } else { /* At this point the next req is ours to process and we hold the reqs_mutex lock. Take the request off the queue. */ req = STAILQ_FIRST(&q->reqs); STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries); /* Ensure that there is at least one other worker thread watching this queue. */ enif_cond_signal(q->reqs_cnd); enif_mutex_unlock(q->reqs_mutex); __stat_tock(q->qwait_stat); /* Perform the work. */ __stat_tick(q->wt_stat); req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); __stat_tock(q->wt_stat); /* Now call the post-work cleanup function. */ req->fn_post(req->args); /* Clean up req for reuse. */ req->ref = 0; req->fn_work = 0; req->fn_post = 0; enif_free(req->args); req->args = NULL; async_nif_recycle_req(req, async_nif); req = NULL; } } enif_mutex_lock(async_nif->we_mutex); SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries); enif_mutex_unlock(async_nif->we_mutex); q->num_workers--; if (q->num_workers == 0) { __stat_reset(q->qwait_stat); __stat_reset(q->wt_stat); } enif_thread_exit(0); return 0; } static void async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) { unsigned int i; unsigned int num_queues = async_nif->num_queues; struct async_nif_work_queue *q = NULL; struct async_nif_req_entry *req = NULL; struct async_nif_worker_entry *we = NULL; UNUSED(env); /* Signal the worker threads, stop what you're doing and exit. To ensure that we don't race with the enqueue() process we first lock all the worker queues, then set shutdown to true, then unlock. The enqueue function will take the queue mutex, then test for shutdown condition, then enqueue only if not shutting down. */ for (i = 0; i < num_queues; i++) { q = &async_nif->queues[i]; enif_mutex_lock(q->reqs_mutex); } /* Set the shutdown flag so that worker threads will no continue executing requests. */ async_nif->shutdown = 1; for (i = 0; i < num_queues; i++) { q = &async_nif->queues[i]; __stat_print_histogram(async_nif->queues[i].qwait_stat, "wterl q-wait"); enif_mutex_unlock(q->reqs_mutex); } __stat_print_histogram(async_nif->queues[i].qwait_stat, "wterl service time"); /* Join for the now exiting worker threads. */ while(async_nif->we_active > 0) { for (i = 0; i < num_queues; i++) enif_cond_broadcast(async_nif->queues[i].reqs_cnd); we = SLIST_FIRST(&async_nif->we_joining); while(we != NULL) { struct async_nif_worker_entry *n = SLIST_NEXT(we, entries); SLIST_REMOVE_HEAD(&async_nif->we_joining, entries); void *exit_value = 0; /* We ignore the thread_join's exit value. */ enif_thread_join(we->tid, &exit_value); enif_free(we); async_nif->we_active--; we = n; } } enif_mutex_destroy(async_nif->we_mutex); /* Cleanup in-flight requests, mutexes and conditions in each work queue. */ for (i = 0; i < num_queues; i++) { q = &async_nif->queues[i]; /* Worker threads are stopped, now toss anything left in the queue. */ req = NULL; req = STAILQ_FIRST(&q->reqs); while(req != NULL) { struct async_nif_req_entry *n = STAILQ_NEXT(req, entries); enif_clear_env(req->env); enif_send(NULL, &req->pid, req->env, enif_make_tuple2(req->env, enif_make_atom(req->env, "error"), enif_make_atom(req->env, "shutdown"))); req->fn_post(req->args); enif_free_env(req->env); enif_free(req->args); enif_free(req); req = n; } enif_mutex_destroy(q->reqs_mutex); enif_cond_destroy(q->reqs_cnd); } /* Free any req structures sitting unused on the recycle queue. */ enif_mutex_lock(async_nif->recycled_req_mutex); req = NULL; req = STAILQ_FIRST(&async_nif->recycled_reqs); while(req != NULL) { struct async_nif_req_entry *n = STAILQ_NEXT(req, entries); enif_free_env(req->env); enif_free(req); req = n; } enif_mutex_unlock(async_nif->recycled_req_mutex); enif_mutex_destroy(async_nif->recycled_req_mutex); memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues)); enif_free(async_nif); } static void * async_nif_load() { static int has_init = 0; unsigned int i, num_queues; ErlNifSysInfo info; struct async_nif_state *async_nif; /* Don't init more than once. */ if (has_init) return 0; else has_init = 1; /* Find out how many schedulers there are. */ enif_system_info(&info, sizeof(ErlNifSysInfo)); /* Size the number of work queues according to schedulers. */ if (info.scheduler_threads > ASYNC_NIF_MAX_WORKERS / 2) { num_queues = ASYNC_NIF_MAX_WORKERS / 2; } else { int remainder = ASYNC_NIF_MAX_WORKERS % info.scheduler_threads; if (remainder != 0) num_queues = info.scheduler_threads - remainder; else num_queues = info.scheduler_threads; if (num_queues < 2) num_queues = 2; } /* Init our portion of priv_data's module-specific state. */ async_nif = enif_alloc(sizeof(struct async_nif_state) + sizeof(struct async_nif_work_queue) * num_queues); if (!async_nif) return NULL; memset(async_nif, 0, sizeof(struct async_nif_state) + sizeof(struct async_nif_work_queue) * num_queues); async_nif->num_queues = num_queues; async_nif->we_active = 0; async_nif->next_q = 0; async_nif->shutdown = 0; STAILQ_INIT(&async_nif->recycled_reqs); async_nif->recycled_req_mutex = enif_mutex_create(NULL); async_nif->we_mutex = enif_mutex_create(NULL); SLIST_INIT(&async_nif->we_joining); for (i = 0; i < async_nif->num_queues; i++) { struct async_nif_work_queue *q = &async_nif->queues[i]; STAILQ_INIT(&q->reqs); q->reqs_mutex = enif_mutex_create(NULL); q->reqs_cnd = enif_cond_create(NULL); q->qwait_stat = __stat_init(100000); q->wt_stat = __stat_init(100000); } return async_nif; } static void async_nif_upgrade(ErlNifEnv *env) { UNUSED(env); // TODO: } #if defined(__cplusplus) } #endif #endif // __ASYNC_NIF_H__