2012-11-25 20:42:18 +00:00
|
|
|
/*
|
|
|
|
*
|
|
|
|
* async_nif: An async thread-pool layer for Erlang's NIF API
|
|
|
|
*
|
|
|
|
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
|
|
|
|
* Author: Gregory Burd <greg@basho.com> <greg@burd.me>
|
|
|
|
*
|
|
|
|
* This file is provided to you under the Apache License,
|
|
|
|
* Version 2.0 (the "License"); you may not use this file
|
|
|
|
* except in compliance with the License. You may obtain
|
|
|
|
* a copy of the License at
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing,
|
|
|
|
* software distributed under the License is distributed on an
|
|
|
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
|
|
* KIND, either express or implied. See the License for the
|
|
|
|
* specific language governing permissions and limitations
|
|
|
|
* under the License.
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
|
|
|
|
#ifndef __ASYNC_NIF_H__
|
|
|
|
#define __ASYNC_NIF_H__
|
|
|
|
|
|
|
|
#if defined(__cplusplus)
|
|
|
|
extern "C" {
|
|
|
|
#endif
|
|
|
|
|
|
|
|
/* Redefine this in your NIF implementation before including this file to
|
|
|
|
change the thread pool size. The maximum number of threads might be
|
|
|
|
bounded on your OS. For instance, to allow 1,000,000 threads on a Linux
|
|
|
|
system you must do the following before launching the process.
|
|
|
|
echo 1000000 > /proc/sys/kernel/threads-max
|
|
|
|
and for all UNIX systems there will be ulimit maximums. */
|
2012-11-29 16:37:36 +00:00
|
|
|
#ifndef ASYNC_NIF_MAX_WORKERS
|
2012-11-30 02:18:53 +00:00
|
|
|
#define ASYNC_NIF_MAX_WORKERS 16
|
2012-11-25 20:42:18 +00:00
|
|
|
#endif
|
|
|
|
|
|
|
|
#include "queue.h"
|
|
|
|
|
2012-11-29 16:37:36 +00:00
|
|
|
struct async_nif_req_entry {
|
2012-11-25 20:42:18 +00:00
|
|
|
ERL_NIF_TERM ref, *argv;
|
2012-11-29 16:37:36 +00:00
|
|
|
ErlNifEnv *env;
|
|
|
|
ErlNifPid pid;
|
2012-11-25 20:42:18 +00:00
|
|
|
void *args;
|
|
|
|
void *priv_data;
|
|
|
|
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, void *, ErlNifPid*, void *);
|
|
|
|
void (*fn_post)(void *);
|
2012-11-29 16:37:36 +00:00
|
|
|
STAILQ_ENTRY(async_nif_req_entry) entries;
|
2012-11-25 20:42:18 +00:00
|
|
|
};
|
2012-11-29 16:37:36 +00:00
|
|
|
STAILQ_HEAD(reqs, async_nif_req_entry) async_nif_reqs = STAILQ_HEAD_INITIALIZER(async_nif_reqs);
|
2012-11-25 20:42:18 +00:00
|
|
|
|
2012-11-29 16:37:36 +00:00
|
|
|
struct async_nif_worker_entry {
|
2012-11-25 20:42:18 +00:00
|
|
|
ErlNifTid tid;
|
2012-11-29 16:37:36 +00:00
|
|
|
LIST_ENTRY(async_nif_worker_entry) entries;
|
2012-11-25 20:42:18 +00:00
|
|
|
};
|
2012-11-29 16:37:36 +00:00
|
|
|
LIST_HEAD(idle_workers, async_nif_worker_entry) async_nif_idle_workers = LIST_HEAD_INITIALIZER(async_nif_worker);
|
|
|
|
|
|
|
|
static volatile unsigned int async_nif_req_count = 0;
|
|
|
|
static volatile unsigned int async_nif_shutdown = 0;
|
|
|
|
static ErlNifMutex *async_nif_req_mutex = NULL;
|
|
|
|
static ErlNifMutex *async_nif_worker_mutex = NULL;
|
|
|
|
static ErlNifCond *async_nif_cnd = NULL;
|
|
|
|
static struct async_nif_worker_entry async_nif_worker_entries[ASYNC_NIF_MAX_WORKERS];
|
2012-11-25 20:42:18 +00:00
|
|
|
|
|
|
|
|
|
|
|
#define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \
|
|
|
|
struct decl ## _args frame; \
|
2012-11-29 16:37:36 +00:00
|
|
|
static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, void *priv_data, ErlNifPid *pid, struct decl ## _args *args) work_block \
|
2012-11-25 20:42:18 +00:00
|
|
|
static void fn_post_ ## decl (struct decl ## _args *args) { \
|
|
|
|
do post_block while(0); \
|
|
|
|
} \
|
2012-11-29 16:37:36 +00:00
|
|
|
static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \
|
2012-11-25 20:42:18 +00:00
|
|
|
struct decl ## _args on_stack_args; \
|
|
|
|
struct decl ## _args *args = &on_stack_args; \
|
|
|
|
struct decl ## _args *copy_of_args; \
|
2012-11-29 16:37:36 +00:00
|
|
|
struct async_nif_req_entry *req = NULL; \
|
|
|
|
ErlNifEnv *new_env = NULL; \
|
|
|
|
/* argv[0] is a ref used for selective recv */ \
|
|
|
|
const ERL_NIF_TERM *argv = argv_in + 1; \
|
|
|
|
argc--; \
|
|
|
|
if (async_nif_shutdown) \
|
|
|
|
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
|
|
|
enif_make_atom(env, "shutdown")); \
|
|
|
|
if (!(new_env = enif_alloc_env())) { \
|
|
|
|
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
|
|
|
enif_make_atom(env, "enomem")); \
|
2012-11-25 20:42:18 +00:00
|
|
|
} \
|
2012-11-29 16:37:36 +00:00
|
|
|
do pre_block while(0); \
|
|
|
|
req = (struct async_nif_req_entry*)enif_alloc(sizeof(struct async_nif_req_entry)); \
|
|
|
|
if (!req) { \
|
|
|
|
fn_post_ ## decl (args); \
|
|
|
|
enif_free_env(new_env); \
|
|
|
|
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
|
|
|
enif_make_atom(env, "enomem")); \
|
2012-11-25 20:42:18 +00:00
|
|
|
} \
|
|
|
|
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \
|
|
|
|
if (!copy_of_args) { \
|
2012-11-29 16:37:36 +00:00
|
|
|
fn_post_ ## decl (args); \
|
|
|
|
enif_free_env(new_env); \
|
|
|
|
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
|
|
|
enif_make_atom(env, "enomem")); \
|
2012-11-25 20:42:18 +00:00
|
|
|
} \
|
|
|
|
memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \
|
2012-11-29 16:37:36 +00:00
|
|
|
req->env = new_env; \
|
|
|
|
req->ref = enif_make_copy(new_env, argv_in[0]); \
|
|
|
|
enif_self(env, &req->pid); \
|
2012-11-25 20:42:18 +00:00
|
|
|
req->args = (void*)copy_of_args; \
|
2012-11-29 16:37:36 +00:00
|
|
|
req->priv_data = enif_priv_data(env); \
|
2012-11-25 20:42:18 +00:00
|
|
|
req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, void*, ErlNifPid*, void *))fn_work_ ## decl ; \
|
|
|
|
req->fn_post = (void (*)(void *))fn_post_ ## decl; \
|
2012-11-29 16:37:36 +00:00
|
|
|
async_nif_enqueue_req(req); \
|
2012-11-25 20:42:18 +00:00
|
|
|
return enif_make_tuple2(env, enif_make_atom(env, "ok"), \
|
2012-11-29 16:37:36 +00:00
|
|
|
enif_make_tuple2(env, enif_make_atom(env, "enqueued"), \
|
|
|
|
enif_make_int(env, async_nif_req_count))); \
|
2012-11-25 20:42:18 +00:00
|
|
|
}
|
|
|
|
|
2012-11-29 16:37:36 +00:00
|
|
|
#define ASYNC_NIF_LOAD() if (async_nif_init() != 0) return -1;
|
|
|
|
#define ASYNC_NIF_UNLOAD() async_nif_unload();
|
|
|
|
#define ASYNC_NIF_UPGRADE() async_nif_unload();
|
2012-11-25 20:42:18 +00:00
|
|
|
|
2012-11-29 16:37:36 +00:00
|
|
|
#define ASYNC_NIF_RETURN_BADARG() return enif_make_badarg(env);
|
|
|
|
#define ASYNC_NIF_WORK_ENV new_env
|
2012-11-25 20:42:18 +00:00
|
|
|
|
2012-11-29 16:37:36 +00:00
|
|
|
#ifndef PULSE_FORCE_USING_PULSE_SEND_HERE
|
2012-11-25 20:42:18 +00:00
|
|
|
#define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg))
|
2012-11-29 16:37:36 +00:00
|
|
|
#else
|
|
|
|
#define ASYNC_NIF_REPLY(msg) PULSE_SEND(NULL, pid, env, enif_make_tuple2(env, ref, msg))
|
|
|
|
#endif
|
2012-11-25 20:42:18 +00:00
|
|
|
|
2012-11-29 16:37:36 +00:00
|
|
|
static void async_nif_enqueue_req(struct async_nif_req_entry *r)
|
2012-11-25 20:42:18 +00:00
|
|
|
{
|
|
|
|
/* Add the request to the work queue. */
|
2012-11-29 16:37:36 +00:00
|
|
|
enif_mutex_lock(async_nif_req_mutex);
|
|
|
|
STAILQ_INSERT_TAIL(&async_nif_reqs, r, entries);
|
|
|
|
async_nif_req_count++;
|
|
|
|
enif_mutex_unlock(async_nif_req_mutex);
|
|
|
|
enif_cond_broadcast(async_nif_cnd);
|
2012-11-25 20:42:18 +00:00
|
|
|
}
|
|
|
|
|
2012-11-29 16:37:36 +00:00
|
|
|
static void *async_nif_worker_fn(void *arg)
|
2012-11-25 20:42:18 +00:00
|
|
|
{
|
2012-11-29 16:37:36 +00:00
|
|
|
struct async_nif_worker_entry *worker = (struct async_nif_worker_entry *)arg;
|
|
|
|
struct async_nif_req_entry *req = NULL;
|
2012-11-25 20:42:18 +00:00
|
|
|
|
|
|
|
/*
|
|
|
|
* Workers are active while there is work on the queue to do and
|
|
|
|
* only in the idle list when they are waiting on new work.
|
|
|
|
*/
|
|
|
|
for(;;) {
|
|
|
|
/* Examine the request queue, are there things to be done? */
|
2012-11-29 16:37:36 +00:00
|
|
|
enif_mutex_lock(async_nif_req_mutex);
|
|
|
|
enif_mutex_lock(async_nif_worker_mutex);
|
|
|
|
LIST_INSERT_HEAD(&async_nif_idle_workers, worker, entries);
|
|
|
|
enif_mutex_unlock(async_nif_worker_mutex);
|
2012-11-25 20:42:18 +00:00
|
|
|
check_again_for_work:
|
2012-11-29 16:37:36 +00:00
|
|
|
if (async_nif_shutdown) { enif_mutex_unlock(async_nif_req_mutex); break; }
|
|
|
|
if ((req = STAILQ_FIRST(&async_nif_reqs)) == NULL) {
|
2012-11-25 20:42:18 +00:00
|
|
|
/* Queue is empty, join the list of idle workers and wait for work */
|
2012-11-29 16:37:36 +00:00
|
|
|
enif_cond_wait(async_nif_cnd, async_nif_req_mutex);
|
2012-11-25 20:42:18 +00:00
|
|
|
goto check_again_for_work;
|
|
|
|
} else {
|
|
|
|
/* `req` is our work request and we hold the lock. */
|
2012-11-29 16:37:36 +00:00
|
|
|
enif_cond_broadcast(async_nif_cnd);
|
2012-11-25 20:42:18 +00:00
|
|
|
|
|
|
|
/* Take the request off the queue. */
|
2012-11-29 16:37:36 +00:00
|
|
|
STAILQ_REMOVE(&async_nif_reqs, req, async_nif_req_entry, entries); async_nif_req_count--;
|
2012-11-25 20:42:18 +00:00
|
|
|
|
|
|
|
/* Now we need to remove this thread from the list of idle threads. */
|
2012-11-29 16:37:36 +00:00
|
|
|
enif_mutex_lock(async_nif_worker_mutex);
|
2012-11-25 20:42:18 +00:00
|
|
|
LIST_REMOVE(worker, entries);
|
|
|
|
|
|
|
|
/* Release the locks in reverse order that we acquired them,
|
|
|
|
so as not to self-deadlock. */
|
2012-11-29 16:37:36 +00:00
|
|
|
enif_mutex_unlock(async_nif_worker_mutex);
|
|
|
|
enif_mutex_unlock(async_nif_req_mutex);
|
2012-11-25 20:42:18 +00:00
|
|
|
|
|
|
|
/* Finally, let's do the work! :) */
|
2012-11-29 16:37:36 +00:00
|
|
|
req->fn_work(req->env, req->ref, req->priv_data, &req->pid, req->args);
|
2012-11-25 20:42:18 +00:00
|
|
|
req->fn_post(req->args);
|
|
|
|
enif_free(req->args);
|
|
|
|
enif_free_env(req->env);
|
|
|
|
enif_free(req);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
enif_thread_exit(0);
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2012-11-29 16:37:36 +00:00
|
|
|
static void async_nif_unload(void)
|
2012-11-25 20:42:18 +00:00
|
|
|
{
|
|
|
|
unsigned int i;
|
|
|
|
|
|
|
|
/* Signal the worker threads, stop what you're doing and exit. */
|
2012-11-29 16:37:36 +00:00
|
|
|
enif_mutex_lock(async_nif_req_mutex);
|
|
|
|
async_nif_shutdown = 1;
|
|
|
|
enif_cond_broadcast(async_nif_cnd);
|
|
|
|
enif_mutex_unlock(async_nif_req_mutex);
|
2012-11-25 20:42:18 +00:00
|
|
|
|
|
|
|
/* Join for the now exiting worker threads. */
|
2012-11-29 16:37:36 +00:00
|
|
|
for (i = 0; i < ASYNC_NIF_MAX_WORKERS; ++i) {
|
2012-11-25 20:42:18 +00:00
|
|
|
void *exit_value = 0; /* Ignore this. */
|
2012-11-29 16:37:36 +00:00
|
|
|
enif_thread_join(async_nif_worker_entries[i].tid, &exit_value);
|
2012-11-25 20:42:18 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/* We won't get here until all threads have exited.
|
|
|
|
Patch things up, and carry on. */
|
2012-11-29 16:37:36 +00:00
|
|
|
enif_mutex_lock(async_nif_req_mutex);
|
2012-11-25 20:42:18 +00:00
|
|
|
|
|
|
|
/* Worker threads are stopped, now toss anything left in the queue. */
|
2012-11-29 16:37:36 +00:00
|
|
|
struct async_nif_req_entry *req = NULL;
|
|
|
|
STAILQ_FOREACH(req, &async_nif_reqs, entries) {
|
|
|
|
STAILQ_REMOVE(&async_nif_reqs, STAILQ_LAST(&async_nif_reqs, async_nif_req_entry, entries),
|
|
|
|
async_nif_req_entry, entries);
|
|
|
|
#ifdef PULSE
|
|
|
|
PULSE_SEND(NULL, &req->pid, req->env,
|
|
|
|
enif_make_tuple2(req->env, enif_make_atom(req->env, "error"),
|
|
|
|
enif_make_atom(req->env, "shutdown")));
|
|
|
|
#else
|
|
|
|
enif_send(NULL, &req->pid, req->env,
|
2012-11-25 20:42:18 +00:00
|
|
|
enif_make_tuple2(req->env, enif_make_atom(req->env, "error"),
|
|
|
|
enif_make_atom(req->env, "shutdown")));
|
2012-11-29 16:37:36 +00:00
|
|
|
#endif
|
2012-11-25 20:42:18 +00:00
|
|
|
req->fn_post(req->args);
|
|
|
|
enif_free(req->args);
|
|
|
|
enif_free(req);
|
2012-11-29 16:37:36 +00:00
|
|
|
async_nif_req_count--;
|
2012-11-25 20:42:18 +00:00
|
|
|
}
|
2012-11-29 16:37:36 +00:00
|
|
|
enif_mutex_unlock(async_nif_req_mutex);
|
2012-11-25 20:42:18 +00:00
|
|
|
|
2012-11-29 16:37:36 +00:00
|
|
|
memset(async_nif_worker_entries, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS, 0);
|
|
|
|
enif_cond_destroy(async_nif_cnd); async_nif_cnd = NULL;
|
|
|
|
enif_mutex_destroy(async_nif_req_mutex); async_nif_req_mutex = NULL;
|
|
|
|
enif_mutex_destroy(async_nif_worker_mutex); async_nif_worker_mutex = NULL;
|
2012-11-25 20:42:18 +00:00
|
|
|
}
|
|
|
|
|
2012-11-29 16:37:36 +00:00
|
|
|
static int async_nif_init(void)
|
2012-11-25 20:42:18 +00:00
|
|
|
{
|
2012-11-29 16:37:36 +00:00
|
|
|
int i;
|
2012-11-25 20:42:18 +00:00
|
|
|
|
|
|
|
/* Don't init more than once. */
|
2012-11-29 16:37:36 +00:00
|
|
|
if (async_nif_req_mutex) return 0;
|
2012-11-25 20:42:18 +00:00
|
|
|
|
2012-11-29 16:37:36 +00:00
|
|
|
async_nif_req_mutex = enif_mutex_create(NULL);
|
|
|
|
async_nif_worker_mutex = enif_mutex_create(NULL);
|
|
|
|
async_nif_cnd = enif_cond_create(NULL);
|
2012-11-25 20:42:18 +00:00
|
|
|
|
|
|
|
/* Setup the requests management. */
|
2012-11-29 16:37:36 +00:00
|
|
|
async_nif_req_count = 0;
|
2012-11-25 20:42:18 +00:00
|
|
|
|
|
|
|
/* Setup the thread pool management. */
|
2012-11-29 16:37:36 +00:00
|
|
|
enif_mutex_lock(async_nif_worker_mutex);
|
|
|
|
memset(async_nif_worker_entries, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS, 0);
|
|
|
|
|
|
|
|
for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) {
|
|
|
|
if (enif_thread_create(NULL, &async_nif_worker_entries[i].tid,
|
|
|
|
&async_nif_worker_fn, (void*)&async_nif_worker_entries[i], NULL) != 0) {
|
|
|
|
async_nif_shutdown = 1;
|
|
|
|
enif_cond_broadcast(async_nif_cnd);
|
|
|
|
enif_mutex_unlock(async_nif_worker_mutex);
|
|
|
|
while(i-- > 0) {
|
|
|
|
void *exit_value = 0; /* Ignore this. */
|
|
|
|
enif_thread_join(async_nif_worker_entries[i].tid, &exit_value);
|
|
|
|
}
|
|
|
|
memset(async_nif_worker_entries, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS, 0);
|
|
|
|
enif_cond_destroy(async_nif_cnd); async_nif_cnd = NULL;
|
|
|
|
enif_mutex_destroy(async_nif_req_mutex); async_nif_req_mutex = NULL;
|
|
|
|
enif_mutex_destroy(async_nif_worker_mutex); async_nif_worker_mutex = NULL;
|
|
|
|
return -1;
|
|
|
|
}
|
2012-11-25 20:42:18 +00:00
|
|
|
}
|
2012-11-29 16:37:36 +00:00
|
|
|
enif_mutex_unlock(async_nif_worker_mutex);
|
2012-11-25 20:42:18 +00:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
#if defined(__cplusplus)
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
|
|
|
|
#endif // __ASYNC_NIF_H__
|