2013-03-25 01:00:48 +00:00
|
|
|
/*
|
|
|
|
* async_nif: An async thread-pool layer for Erlang's NIF API
|
|
|
|
*
|
|
|
|
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
|
|
|
|
* Author: Gregory Burd <greg@basho.com> <greg@burd.me>
|
|
|
|
*
|
2013-06-19 18:54:27 +00:00
|
|
|
* This file is provided to you under the Apache License, Version 2.0 (the
|
|
|
|
* "License"); you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at:
|
2013-03-25 01:00:48 +00:00
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
2013-06-19 18:54:27 +00:00
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
|
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
|
|
* License for the specific language governing permissions and limitations
|
2013-03-25 01:00:48 +00:00
|
|
|
* under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
#ifndef __ASYNC_NIF_H__
|
|
|
|
#define __ASYNC_NIF_H__
|
|
|
|
|
|
|
|
#if defined(__cplusplus)
|
|
|
|
extern "C" {
|
|
|
|
#endif
|
|
|
|
|
2013-04-17 20:48:23 +00:00
|
|
|
#include <assert.h>
|
2013-07-02 20:46:04 +00:00
|
|
|
|
2013-06-25 17:31:43 +00:00
|
|
|
#include "queue.h"
|
2013-04-14 23:33:14 +00:00
|
|
|
|
2013-05-28 20:14:19 +00:00
|
|
|
#ifndef UNUSED
|
|
|
|
#define UNUSED(v) ((void)(v))
|
2013-04-22 13:52:21 +00:00
|
|
|
#endif
|
|
|
|
|
2013-06-19 18:54:27 +00:00
|
|
|
#define ASYNC_NIF_MAX_WORKERS 1024
|
2013-07-02 20:46:04 +00:00
|
|
|
#define ASYNC_NIF_MIN_WORKERS 2
|
2013-07-30 17:30:43 +00:00
|
|
|
#define ASYNC_NIF_WORKER_QUEUE_SIZE 4096
|
2013-06-12 12:09:51 +00:00
|
|
|
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
|
2013-04-15 19:22:12 +00:00
|
|
|
|
2013-05-02 02:02:21 +00:00
|
|
|
struct async_nif_req_entry {
|
|
|
|
ERL_NIF_TERM ref;
|
|
|
|
ErlNifEnv *env;
|
|
|
|
ErlNifPid pid;
|
|
|
|
void *args;
|
|
|
|
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
|
|
|
|
void (*fn_post)(void *);
|
2013-07-02 01:09:21 +00:00
|
|
|
STAILQ_ENTRY(async_nif_req_entry) entries;
|
2013-05-02 02:02:21 +00:00
|
|
|
};
|
2013-07-02 01:09:21 +00:00
|
|
|
|
2013-04-15 19:22:12 +00:00
|
|
|
|
2013-04-14 12:44:54 +00:00
|
|
|
struct async_nif_work_queue {
|
2013-07-02 01:09:21 +00:00
|
|
|
unsigned int num_workers;
|
2013-07-02 20:46:04 +00:00
|
|
|
unsigned int depth;
|
2013-04-14 12:44:54 +00:00
|
|
|
ErlNifMutex *reqs_mutex;
|
|
|
|
ErlNifCond *reqs_cnd;
|
2013-07-25 17:29:16 +00:00
|
|
|
struct async_nif_work_queue *next;
|
2013-07-02 01:09:21 +00:00
|
|
|
STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
|
2013-04-14 12:44:54 +00:00
|
|
|
};
|
|
|
|
|
2013-03-25 01:00:48 +00:00
|
|
|
struct async_nif_worker_entry {
|
|
|
|
ErlNifTid tid;
|
2013-04-14 12:44:54 +00:00
|
|
|
unsigned int worker_id;
|
|
|
|
struct async_nif_state *async_nif;
|
|
|
|
struct async_nif_work_queue *q;
|
2013-06-25 17:31:43 +00:00
|
|
|
SLIST_ENTRY(async_nif_worker_entry) entries;
|
2013-03-25 01:00:48 +00:00
|
|
|
};
|
|
|
|
|
2013-04-05 22:09:54 +00:00
|
|
|
struct async_nif_state {
|
2013-04-14 12:44:54 +00:00
|
|
|
unsigned int shutdown;
|
2013-06-25 17:31:43 +00:00
|
|
|
ErlNifMutex *we_mutex;
|
|
|
|
unsigned int we_active;
|
|
|
|
SLIST_HEAD(joining, async_nif_worker_entry) we_joining;
|
2013-04-14 12:44:54 +00:00
|
|
|
unsigned int num_queues;
|
|
|
|
unsigned int next_q;
|
2013-07-02 01:09:21 +00:00
|
|
|
STAILQ_HEAD(recycled_reqs, async_nif_req_entry) recycled_reqs;
|
2013-04-25 15:30:11 +00:00
|
|
|
unsigned int num_reqs;
|
|
|
|
ErlNifMutex *recycled_req_mutex;
|
2013-04-17 15:17:13 +00:00
|
|
|
struct async_nif_work_queue queues[];
|
2013-04-06 15:05:41 +00:00
|
|
|
};
|
|
|
|
|
2013-03-25 01:00:48 +00:00
|
|
|
#define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \
|
|
|
|
struct decl ## _args frame; \
|
2013-04-22 13:52:21 +00:00
|
|
|
static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) { \
|
2013-05-28 20:14:19 +00:00
|
|
|
UNUSED(worker_id); \
|
2013-06-19 18:54:27 +00:00
|
|
|
DPRINTF("async_nif: calling \"%s\"", __func__); \
|
2013-04-22 13:52:21 +00:00
|
|
|
do work_block while(0); \
|
2013-06-19 18:54:27 +00:00
|
|
|
DPRINTF("async_nif: returned from \"%s\"", __func__); \
|
2013-04-22 13:52:21 +00:00
|
|
|
} \
|
2013-03-25 01:00:48 +00:00
|
|
|
static void fn_post_ ## decl (struct decl ## _args *args) { \
|
2013-05-28 20:14:19 +00:00
|
|
|
UNUSED(args); \
|
2013-06-19 18:54:27 +00:00
|
|
|
DPRINTF("async_nif: calling \"fn_post_%s\"", #decl); \
|
2013-04-16 21:09:34 +00:00
|
|
|
do post_block while(0); \
|
2013-06-19 18:54:27 +00:00
|
|
|
DPRINTF("async_nif: returned from \"fn_post_%s\"", #decl); \
|
2013-03-25 01:00:48 +00:00
|
|
|
} \
|
|
|
|
static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \
|
|
|
|
struct decl ## _args on_stack_args; \
|
|
|
|
struct decl ## _args *args = &on_stack_args; \
|
|
|
|
struct decl ## _args *copy_of_args; \
|
2013-04-16 21:09:34 +00:00
|
|
|
struct async_nif_req_entry *req = NULL; \
|
2013-06-10 18:31:59 +00:00
|
|
|
unsigned int affinity = 0; \
|
2013-03-25 01:00:48 +00:00
|
|
|
ErlNifEnv *new_env = NULL; \
|
|
|
|
/* argv[0] is a ref used for selective recv */ \
|
2013-04-15 04:08:01 +00:00
|
|
|
const ERL_NIF_TERM *argv = argv_in + 1; \
|
|
|
|
argc -= 1; \
|
2013-04-18 14:32:29 +00:00
|
|
|
/* Note: !!! this assumes that the first element of priv_data is ours */ \
|
|
|
|
struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \
|
2013-07-02 20:46:04 +00:00
|
|
|
if (async_nif->shutdown) { \
|
2013-03-25 01:00:48 +00:00
|
|
|
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
|
|
|
enif_make_atom(env, "shutdown")); \
|
2013-07-02 20:46:04 +00:00
|
|
|
} \
|
2013-04-25 15:30:11 +00:00
|
|
|
req = async_nif_reuse_req(async_nif); \
|
2013-06-10 18:31:59 +00:00
|
|
|
if (!req) { \
|
|
|
|
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
|
|
|
enif_make_atom(env, "eagain")); \
|
|
|
|
} \
|
2013-06-12 12:09:51 +00:00
|
|
|
new_env = req->env; \
|
2013-06-19 18:54:27 +00:00
|
|
|
DPRINTF("async_nif: calling \"%s\"", __func__); \
|
2013-03-25 01:00:48 +00:00
|
|
|
do pre_block while(0); \
|
2013-06-19 18:54:27 +00:00
|
|
|
DPRINTF("async_nif: returned from \"%s\"", __func__); \
|
2013-03-25 01:00:48 +00:00
|
|
|
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \
|
|
|
|
if (!copy_of_args) { \
|
|
|
|
fn_post_ ## decl (args); \
|
2013-06-10 18:31:59 +00:00
|
|
|
async_nif_recycle_req(req, async_nif); \
|
2013-03-25 01:00:48 +00:00
|
|
|
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
|
|
|
enif_make_atom(env, "enomem")); \
|
|
|
|
} \
|
|
|
|
memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \
|
2013-04-15 21:37:14 +00:00
|
|
|
req->ref = enif_make_copy(new_env, argv_in[0]); \
|
|
|
|
enif_self(env, &req->pid); \
|
|
|
|
req->args = (void*)copy_of_args; \
|
|
|
|
req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \
|
2013-04-17 20:48:23 +00:00
|
|
|
req->fn_post = (void (*)(void *))fn_post_ ## decl; \
|
2013-04-17 15:17:13 +00:00
|
|
|
int h = -1; \
|
2013-04-15 22:46:06 +00:00
|
|
|
if (affinity) \
|
2013-07-02 01:09:21 +00:00
|
|
|
h = ((unsigned int)affinity) % async_nif->num_queues; \
|
2013-04-16 21:09:34 +00:00
|
|
|
ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \
|
2013-04-17 15:17:13 +00:00
|
|
|
if (!reply) { \
|
2013-04-17 20:48:23 +00:00
|
|
|
fn_post_ ## decl (args); \
|
2013-06-10 18:31:59 +00:00
|
|
|
async_nif_recycle_req(req, async_nif); \
|
2013-04-17 15:17:13 +00:00
|
|
|
enif_free(copy_of_args); \
|
|
|
|
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
2013-07-02 20:46:04 +00:00
|
|
|
enif_make_atom(env, "eagain")); \
|
2013-04-17 15:17:13 +00:00
|
|
|
} \
|
2013-04-16 21:09:34 +00:00
|
|
|
return reply; \
|
2013-03-25 01:00:48 +00:00
|
|
|
}
|
|
|
|
|
2013-04-15 19:22:12 +00:00
|
|
|
#define ASYNC_NIF_INIT(name) \
|
2013-04-12 19:25:56 +00:00
|
|
|
static ErlNifMutex *name##_async_nif_coord = NULL;
|
|
|
|
|
|
|
|
#define ASYNC_NIF_LOAD(name, priv) do { \
|
|
|
|
if (!name##_async_nif_coord) \
|
2013-07-25 17:29:16 +00:00
|
|
|
name##_async_nif_coord = enif_mutex_create("nif_coord load"); \
|
2013-04-12 19:25:56 +00:00
|
|
|
enif_mutex_lock(name##_async_nif_coord); \
|
|
|
|
priv = async_nif_load(); \
|
|
|
|
enif_mutex_unlock(name##_async_nif_coord); \
|
|
|
|
} while(0);
|
2013-04-18 14:32:29 +00:00
|
|
|
#define ASYNC_NIF_UNLOAD(name, env, priv) do { \
|
2013-04-12 19:25:56 +00:00
|
|
|
if (!name##_async_nif_coord) \
|
2013-07-25 17:29:16 +00:00
|
|
|
name##_async_nif_coord = enif_mutex_create("nif_coord unload"); \
|
2013-04-12 19:25:56 +00:00
|
|
|
enif_mutex_lock(name##_async_nif_coord); \
|
2013-04-18 14:32:29 +00:00
|
|
|
async_nif_unload(env, priv); \
|
2013-04-12 19:25:56 +00:00
|
|
|
enif_mutex_unlock(name##_async_nif_coord); \
|
|
|
|
enif_mutex_destroy(name##_async_nif_coord); \
|
|
|
|
name##_async_nif_coord = NULL; \
|
|
|
|
} while(0);
|
|
|
|
#define ASYNC_NIF_UPGRADE(name, env) do { \
|
|
|
|
if (!name##_async_nif_coord) \
|
2013-07-25 17:29:16 +00:00
|
|
|
name##_async_nif_coord = enif_mutex_create("nif_coord upgrade"); \
|
2013-04-12 19:25:56 +00:00
|
|
|
enif_mutex_lock(name##_async_nif_coord); \
|
|
|
|
async_nif_upgrade(env); \
|
|
|
|
enif_mutex_unlock(name##_async_nif_coord); \
|
|
|
|
} while(0);
|
2013-03-25 01:00:48 +00:00
|
|
|
|
2013-04-25 15:30:11 +00:00
|
|
|
#define ASYNC_NIF_RETURN_BADARG() do { \
|
|
|
|
async_nif_recycle_req(req, async_nif); \
|
|
|
|
return enif_make_badarg(env); \
|
|
|
|
} while(0);
|
2013-03-25 01:00:48 +00:00
|
|
|
#define ASYNC_NIF_WORK_ENV new_env
|
|
|
|
|
|
|
|
#define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg))
|
|
|
|
|
2013-04-25 15:30:11 +00:00
|
|
|
/**
|
|
|
|
* Return a request structure from the recycled req queue if one exists,
|
|
|
|
* otherwise create one.
|
|
|
|
*/
|
|
|
|
struct async_nif_req_entry *
|
|
|
|
async_nif_reuse_req(struct async_nif_state *async_nif)
|
|
|
|
{
|
|
|
|
struct async_nif_req_entry *req = NULL;
|
|
|
|
ErlNifEnv *env = NULL;
|
|
|
|
|
|
|
|
enif_mutex_lock(async_nif->recycled_req_mutex);
|
2013-07-02 01:09:21 +00:00
|
|
|
if (STAILQ_EMPTY(&async_nif->recycled_reqs)) {
|
2013-04-25 15:30:11 +00:00
|
|
|
if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) {
|
|
|
|
req = enif_alloc(sizeof(struct async_nif_req_entry));
|
|
|
|
if (req) {
|
|
|
|
memset(req, 0, sizeof(struct async_nif_req_entry));
|
|
|
|
env = enif_alloc_env();
|
2013-06-25 17:31:43 +00:00
|
|
|
if (env) {
|
2013-04-25 15:30:11 +00:00
|
|
|
req->env = env;
|
2013-07-25 17:29:16 +00:00
|
|
|
__sync_fetch_and_add(&async_nif->num_reqs, 1);
|
2013-06-25 17:31:43 +00:00
|
|
|
} else {
|
|
|
|
enif_free(req);
|
|
|
|
req = NULL;
|
2013-04-25 15:30:11 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
2013-07-02 01:09:21 +00:00
|
|
|
req = STAILQ_FIRST(&async_nif->recycled_reqs);
|
|
|
|
STAILQ_REMOVE(&async_nif->recycled_reqs, req, async_nif_req_entry, entries);
|
2013-04-25 15:30:11 +00:00
|
|
|
}
|
|
|
|
enif_mutex_unlock(async_nif->recycled_req_mutex);
|
|
|
|
return req;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Store the request for future re-use.
|
2013-05-02 02:02:37 +00:00
|
|
|
*
|
|
|
|
* req a request entry with an ErlNifEnv* which will be cleared
|
|
|
|
* before reuse, but not until then.
|
|
|
|
* async_nif a handle to our state so that we can find and use the mutex
|
2013-04-25 15:30:11 +00:00
|
|
|
*/
|
|
|
|
void
|
|
|
|
async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif)
|
|
|
|
{
|
2013-06-10 18:31:59 +00:00
|
|
|
ErlNifEnv *env = NULL;
|
2013-04-25 15:30:11 +00:00
|
|
|
enif_mutex_lock(async_nif->recycled_req_mutex);
|
2013-06-25 17:31:43 +00:00
|
|
|
enif_clear_env(req->env);
|
2013-06-10 18:31:59 +00:00
|
|
|
env = req->env;
|
|
|
|
memset(req, 0, sizeof(struct async_nif_req_entry));
|
|
|
|
req->env = env;
|
2013-07-02 01:09:21 +00:00
|
|
|
STAILQ_INSERT_TAIL(&async_nif->recycled_reqs, req, entries);
|
2013-04-25 15:30:11 +00:00
|
|
|
enif_mutex_unlock(async_nif->recycled_req_mutex);
|
|
|
|
}
|
|
|
|
|
2013-06-25 17:31:43 +00:00
|
|
|
static void *async_nif_worker_fn(void *);
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Start up a worker thread.
|
|
|
|
*/
|
|
|
|
static int
|
|
|
|
async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_queue *q)
|
|
|
|
{
|
|
|
|
struct async_nif_worker_entry *we;
|
|
|
|
|
|
|
|
if (0 == q)
|
|
|
|
return EINVAL;
|
|
|
|
|
|
|
|
enif_mutex_lock(async_nif->we_mutex);
|
|
|
|
|
|
|
|
we = SLIST_FIRST(&async_nif->we_joining);
|
|
|
|
while(we != NULL) {
|
|
|
|
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
|
2013-07-03 02:07:34 +00:00
|
|
|
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
|
2013-06-25 17:31:43 +00:00
|
|
|
void *exit_value = 0; /* We ignore the thread_join's exit value. */
|
|
|
|
enif_thread_join(we->tid, &exit_value);
|
|
|
|
enif_free(we);
|
|
|
|
async_nif->we_active--;
|
|
|
|
we = n;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) {
|
|
|
|
enif_mutex_unlock(async_nif->we_mutex);
|
|
|
|
return EAGAIN;
|
|
|
|
}
|
|
|
|
|
|
|
|
we = enif_alloc(sizeof(struct async_nif_worker_entry));
|
|
|
|
if (!we) {
|
|
|
|
enif_mutex_unlock(async_nif->we_mutex);
|
|
|
|
return ENOMEM;
|
|
|
|
}
|
|
|
|
memset(we, 0, sizeof(struct async_nif_worker_entry));
|
|
|
|
we->worker_id = async_nif->we_active++;
|
|
|
|
we->async_nif = async_nif;
|
|
|
|
we->q = q;
|
|
|
|
|
|
|
|
enif_mutex_unlock(async_nif->we_mutex);
|
|
|
|
return enif_thread_create(NULL,&we->tid, &async_nif_worker_fn, (void*)we, 0);
|
|
|
|
}
|
|
|
|
|
2013-04-15 22:46:06 +00:00
|
|
|
/**
|
2013-05-02 02:02:37 +00:00
|
|
|
* Enqueue a request for processing by a worker thread.
|
|
|
|
*
|
|
|
|
* Places the request into a work queue determined either by the
|
|
|
|
* provided affinity or by iterating through the available queues.
|
2013-04-15 19:22:12 +00:00
|
|
|
*/
|
2013-04-05 22:09:54 +00:00
|
|
|
static ERL_NIF_TERM
|
2013-04-17 15:17:13 +00:00
|
|
|
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint)
|
2013-03-25 01:00:48 +00:00
|
|
|
{
|
2013-04-17 15:17:13 +00:00
|
|
|
/* Identify the most appropriate worker for this request. */
|
2013-07-25 17:29:16 +00:00
|
|
|
unsigned int i, last_qid, qid = 0;
|
2013-04-17 15:17:13 +00:00
|
|
|
struct async_nif_work_queue *q = NULL;
|
2013-07-27 00:08:49 +00:00
|
|
|
double avg_depth = 0.0;
|
2013-04-25 15:30:11 +00:00
|
|
|
|
|
|
|
/* Either we're choosing a queue based on some affinity/hinted value or we
|
|
|
|
need to select the next queue in the rotation and atomically update that
|
|
|
|
global value (next_q is shared across worker threads) . */
|
|
|
|
if (hint >= 0) {
|
|
|
|
qid = (unsigned int)hint;
|
|
|
|
} else {
|
2013-07-25 17:29:16 +00:00
|
|
|
do {
|
|
|
|
last_qid = __sync_fetch_and_add(&async_nif->next_q, 0);
|
|
|
|
qid = (last_qid + 1) % async_nif->num_queues;
|
|
|
|
} while (!__sync_bool_compare_and_swap(&async_nif->next_q, last_qid, qid));
|
2013-04-25 15:30:11 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/* Now we inspect and interate across the set of queues trying to select one
|
|
|
|
that isn't too full or too slow. */
|
2013-07-02 20:46:04 +00:00
|
|
|
for (i = 0; i < async_nif->num_queues; i++) {
|
|
|
|
/* Compute the average queue depth not counting queues which are empty or
|
|
|
|
the queue we're considering right now. */
|
|
|
|
unsigned int j, n = 0;
|
|
|
|
for (j = 0; j < async_nif->num_queues; j++) {
|
|
|
|
if (j != qid && async_nif->queues[j].depth != 0) {
|
|
|
|
n++;
|
|
|
|
avg_depth += async_nif->queues[j].depth;
|
|
|
|
}
|
|
|
|
}
|
2013-07-26 14:27:21 +00:00
|
|
|
if (avg_depth) avg_depth /= n;
|
2013-07-02 20:46:04 +00:00
|
|
|
|
|
|
|
/* Lock this queue under consideration, then check for shutdown. While
|
|
|
|
we hold this lock either a) we're shutting down so exit now or b) this
|
|
|
|
queue will be valid until we release the lock. */
|
2013-04-15 04:08:01 +00:00
|
|
|
q = &async_nif->queues[qid];
|
2013-04-17 15:17:13 +00:00
|
|
|
enif_mutex_lock(q->reqs_mutex);
|
2013-04-20 12:28:38 +00:00
|
|
|
if (async_nif->shutdown) {
|
|
|
|
enif_mutex_unlock(q->reqs_mutex);
|
2013-04-17 15:17:13 +00:00
|
|
|
return 0;
|
2013-04-20 12:28:38 +00:00
|
|
|
}
|
2013-07-02 01:09:21 +00:00
|
|
|
|
2013-07-02 20:46:04 +00:00
|
|
|
/* Try not to enqueue a request into a queue that isn't keeping up with
|
|
|
|
the request volume. */
|
|
|
|
if (q->depth <= avg_depth) break;
|
2013-07-02 01:09:21 +00:00
|
|
|
else {
|
2013-07-02 20:46:04 +00:00
|
|
|
enif_mutex_unlock(q->reqs_mutex);
|
|
|
|
qid = (qid + 1) % async_nif->num_queues;
|
2013-04-17 15:17:13 +00:00
|
|
|
}
|
2013-07-02 20:46:04 +00:00
|
|
|
}
|
2013-06-14 14:52:45 +00:00
|
|
|
|
2013-07-02 20:46:04 +00:00
|
|
|
/* If the for loop finished then we didn't find a suitable queue for this
|
2013-07-15 16:21:10 +00:00
|
|
|
request, meaning we're backed up so trigger eagain. Note that if we left
|
|
|
|
the loop in this way we hold no lock. */
|
|
|
|
if (i == async_nif->num_queues) return 0;
|
2013-07-02 01:09:21 +00:00
|
|
|
|
2013-07-02 23:58:00 +00:00
|
|
|
/* Add the request to the queue. */
|
2013-07-02 01:09:21 +00:00
|
|
|
STAILQ_INSERT_TAIL(&q->reqs, req, entries);
|
2013-07-25 17:29:16 +00:00
|
|
|
__sync_fetch_and_add(&q->depth, 1);
|
2013-04-15 21:37:14 +00:00
|
|
|
|
2013-07-02 23:58:00 +00:00
|
|
|
/* We've selected a queue for this new request now check to make sure there are
|
|
|
|
enough workers actively processing requests on this queue. */
|
2013-07-26 14:27:21 +00:00
|
|
|
while (q->depth > q->num_workers) {
|
2013-07-25 17:29:16 +00:00
|
|
|
switch(async_nif_start_worker(async_nif, q)) {
|
|
|
|
case EINVAL: case ENOMEM: default: return 0;
|
2013-07-26 14:27:21 +00:00
|
|
|
case EAGAIN: continue;
|
2013-07-26 14:31:23 +00:00
|
|
|
case 0: __sync_fetch_and_add(&q->num_workers, 1); goto done;
|
2013-07-25 17:29:16 +00:00
|
|
|
}
|
2013-07-26 14:27:21 +00:00
|
|
|
}done:;
|
2013-07-02 23:58:00 +00:00
|
|
|
|
2013-04-12 19:25:56 +00:00
|
|
|
/* Build the term before releasing the lock so as not to race on the use of
|
2013-04-17 15:17:13 +00:00
|
|
|
the req pointer (which will soon become invalid in another thread
|
|
|
|
performing the request). */
|
2013-04-12 19:25:56 +00:00
|
|
|
ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"),
|
2013-04-17 15:17:13 +00:00
|
|
|
enif_make_atom(req->env, "enqueued"));
|
2013-04-14 17:54:45 +00:00
|
|
|
enif_cond_signal(q->reqs_cnd);
|
2013-06-04 18:45:23 +00:00
|
|
|
enif_mutex_unlock(q->reqs_mutex);
|
2013-04-12 19:25:56 +00:00
|
|
|
return reply;
|
2013-03-25 01:00:48 +00:00
|
|
|
}
|
|
|
|
|
2013-05-02 02:02:37 +00:00
|
|
|
/**
|
2013-06-14 14:52:45 +00:00
|
|
|
* Worker threads execute this function. Here each worker pulls requests of
|
|
|
|
* their respective queues, executes that work and continues doing that until
|
|
|
|
* they see the shutdown flag is set at which point they exit.
|
2013-05-02 02:02:37 +00:00
|
|
|
*/
|
2013-04-06 15:05:41 +00:00
|
|
|
static void *
|
|
|
|
async_nif_worker_fn(void *arg)
|
2013-03-25 01:00:48 +00:00
|
|
|
{
|
2013-04-14 12:44:54 +00:00
|
|
|
struct async_nif_worker_entry *we = (struct async_nif_worker_entry *)arg;
|
|
|
|
unsigned int worker_id = we->worker_id;
|
|
|
|
struct async_nif_state *async_nif = we->async_nif;
|
|
|
|
struct async_nif_work_queue *q = we->q;
|
2013-04-17 17:19:06 +00:00
|
|
|
struct async_nif_req_entry *req = NULL;
|
2013-07-25 17:29:16 +00:00
|
|
|
unsigned int tries = async_nif->num_queues;
|
2013-03-25 01:00:48 +00:00
|
|
|
|
|
|
|
for(;;) {
|
|
|
|
/* Examine the request queue, are there things to be done? */
|
2013-04-14 12:44:54 +00:00
|
|
|
enif_mutex_lock(q->reqs_mutex);
|
2013-03-25 01:00:48 +00:00
|
|
|
check_again_for_work:
|
2013-04-12 19:25:56 +00:00
|
|
|
if (async_nif->shutdown) {
|
2013-04-14 12:44:54 +00:00
|
|
|
enif_mutex_unlock(q->reqs_mutex);
|
2013-04-12 19:25:56 +00:00
|
|
|
break;
|
|
|
|
}
|
2013-07-02 01:09:21 +00:00
|
|
|
if (STAILQ_EMPTY(&q->reqs)) {
|
2013-05-02 02:02:37 +00:00
|
|
|
/* Queue is empty so we wait for more work to arrive. */
|
2013-07-02 20:46:04 +00:00
|
|
|
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
|
|
|
|
enif_mutex_unlock(q->reqs_mutex);
|
2013-07-30 18:21:26 +00:00
|
|
|
if (tries == 0 && q == we->q) {
|
|
|
|
tries = async_nif->num_queues;
|
|
|
|
continue;
|
|
|
|
} else {
|
2013-07-25 17:29:16 +00:00
|
|
|
tries--;
|
|
|
|
__sync_fetch_and_add(&q->num_workers, -1);
|
|
|
|
q = q->next;
|
|
|
|
__sync_fetch_and_add(&q->num_workers, 1);
|
2013-07-30 18:21:26 +00:00
|
|
|
continue; // try next queue
|
2013-07-25 17:29:16 +00:00
|
|
|
}
|
2013-06-25 17:31:43 +00:00
|
|
|
} else {
|
2013-07-02 20:46:04 +00:00
|
|
|
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
|
|
|
|
goto check_again_for_work;
|
2013-06-25 17:31:43 +00:00
|
|
|
}
|
2013-03-25 01:00:48 +00:00
|
|
|
} else {
|
2013-04-15 04:08:01 +00:00
|
|
|
/* At this point the next req is ours to process and we hold the
|
2013-04-17 17:19:06 +00:00
|
|
|
reqs_mutex lock. Take the request off the queue. */
|
2013-07-02 01:09:21 +00:00
|
|
|
req = STAILQ_FIRST(&q->reqs);
|
|
|
|
STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries);
|
2013-07-25 17:29:16 +00:00
|
|
|
__sync_fetch_and_add(&q->depth, -1);
|
2013-04-17 17:19:06 +00:00
|
|
|
|
2013-04-20 12:28:38 +00:00
|
|
|
/* Ensure that there is at least one other worker thread watching this
|
|
|
|
queue. */
|
2013-04-17 17:19:06 +00:00
|
|
|
enif_cond_signal(q->reqs_cnd);
|
2013-06-04 18:45:23 +00:00
|
|
|
enif_mutex_unlock(q->reqs_mutex);
|
2013-04-17 17:19:06 +00:00
|
|
|
|
2013-04-20 12:28:38 +00:00
|
|
|
/* Perform the work. */
|
2013-04-17 17:19:06 +00:00
|
|
|
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
|
2013-04-17 20:48:23 +00:00
|
|
|
|
2013-04-20 12:28:38 +00:00
|
|
|
/* Now call the post-work cleanup function. */
|
2013-04-17 17:19:06 +00:00
|
|
|
req->fn_post(req->args);
|
2013-04-17 20:48:23 +00:00
|
|
|
|
2013-04-25 15:30:11 +00:00
|
|
|
/* Clean up req for reuse. */
|
|
|
|
req->ref = 0;
|
|
|
|
req->fn_work = 0;
|
|
|
|
req->fn_post = 0;
|
2013-04-17 17:19:06 +00:00
|
|
|
enif_free(req->args);
|
2013-04-25 15:30:11 +00:00
|
|
|
req->args = NULL;
|
|
|
|
async_nif_recycle_req(req, async_nif);
|
2013-04-17 20:48:23 +00:00
|
|
|
req = NULL;
|
2013-03-25 01:00:48 +00:00
|
|
|
}
|
|
|
|
}
|
2013-06-25 17:31:43 +00:00
|
|
|
enif_mutex_lock(async_nif->we_mutex);
|
|
|
|
SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries);
|
|
|
|
enif_mutex_unlock(async_nif->we_mutex);
|
2013-07-25 17:29:16 +00:00
|
|
|
__sync_fetch_and_add(&q->num_workers, -1);
|
2013-03-25 01:00:48 +00:00
|
|
|
enif_thread_exit(0);
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2013-04-12 19:25:56 +00:00
|
|
|
static void
|
2013-04-18 14:32:29 +00:00
|
|
|
async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
|
2013-03-25 01:00:48 +00:00
|
|
|
{
|
|
|
|
unsigned int i;
|
2013-04-17 20:48:23 +00:00
|
|
|
unsigned int num_queues = async_nif->num_queues;
|
2013-04-17 22:26:59 +00:00
|
|
|
struct async_nif_work_queue *q = NULL;
|
2013-04-25 15:30:11 +00:00
|
|
|
struct async_nif_req_entry *req = NULL;
|
2013-06-25 17:31:43 +00:00
|
|
|
struct async_nif_worker_entry *we = NULL;
|
2013-05-28 20:14:19 +00:00
|
|
|
UNUSED(env);
|
2013-03-25 01:00:48 +00:00
|
|
|
|
2013-07-02 20:46:04 +00:00
|
|
|
/* Signal the worker threads, stop what you're doing and exit. To ensure
|
|
|
|
that we don't race with the enqueue() process we first lock all the worker
|
|
|
|
queues, then set shutdown to true, then unlock. The enqueue function will
|
|
|
|
take the queue mutex, then test for shutdown condition, then enqueue only
|
|
|
|
if not shutting down. */
|
2013-04-17 22:26:59 +00:00
|
|
|
for (i = 0; i < num_queues; i++) {
|
|
|
|
q = &async_nif->queues[i];
|
|
|
|
enif_mutex_lock(q->reqs_mutex);
|
|
|
|
}
|
2013-06-11 16:13:06 +00:00
|
|
|
/* Set the shutdown flag so that worker threads will no continue
|
|
|
|
executing requests. */
|
2013-04-05 22:09:54 +00:00
|
|
|
async_nif->shutdown = 1;
|
2013-04-17 20:48:23 +00:00
|
|
|
for (i = 0; i < num_queues; i++) {
|
2013-04-17 22:26:59 +00:00
|
|
|
q = &async_nif->queues[i];
|
|
|
|
enif_mutex_unlock(q->reqs_mutex);
|
2013-04-14 12:44:54 +00:00
|
|
|
}
|
2013-03-25 01:00:48 +00:00
|
|
|
|
|
|
|
/* Join for the now exiting worker threads. */
|
2013-06-25 17:31:43 +00:00
|
|
|
while(async_nif->we_active > 0) {
|
|
|
|
for (i = 0; i < num_queues; i++)
|
2013-07-02 20:46:04 +00:00
|
|
|
enif_cond_broadcast(async_nif->queues[i].reqs_cnd);
|
2013-07-03 02:07:34 +00:00
|
|
|
enif_mutex_lock(async_nif->we_mutex);
|
2013-06-25 17:31:43 +00:00
|
|
|
we = SLIST_FIRST(&async_nif->we_joining);
|
|
|
|
while(we != NULL) {
|
2013-07-02 20:46:04 +00:00
|
|
|
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
|
2013-07-03 02:07:34 +00:00
|
|
|
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
|
2013-07-02 20:46:04 +00:00
|
|
|
void *exit_value = 0; /* We ignore the thread_join's exit value. */
|
|
|
|
enif_thread_join(we->tid, &exit_value);
|
|
|
|
enif_free(we);
|
|
|
|
async_nif->we_active--;
|
|
|
|
we = n;
|
2013-06-25 17:31:43 +00:00
|
|
|
}
|
2013-07-03 02:07:34 +00:00
|
|
|
enif_mutex_unlock(async_nif->we_mutex);
|
2013-03-25 01:00:48 +00:00
|
|
|
}
|
2013-06-25 17:31:43 +00:00
|
|
|
enif_mutex_destroy(async_nif->we_mutex);
|
2013-03-25 01:00:48 +00:00
|
|
|
|
2013-04-25 15:30:11 +00:00
|
|
|
/* Cleanup in-flight requests, mutexes and conditions in each work queue. */
|
2013-04-17 15:17:13 +00:00
|
|
|
for (i = 0; i < num_queues; i++) {
|
2013-04-17 22:26:59 +00:00
|
|
|
q = &async_nif->queues[i];
|
2013-04-14 12:44:54 +00:00
|
|
|
|
|
|
|
/* Worker threads are stopped, now toss anything left in the queue. */
|
2013-04-25 15:30:11 +00:00
|
|
|
req = NULL;
|
2013-07-02 01:09:21 +00:00
|
|
|
req = STAILQ_FIRST(&q->reqs);
|
|
|
|
while(req != NULL) {
|
|
|
|
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
|
2013-04-21 15:11:17 +00:00
|
|
|
enif_clear_env(req->env);
|
2013-04-14 12:44:54 +00:00
|
|
|
enif_send(NULL, &req->pid, req->env,
|
|
|
|
enif_make_tuple2(req->env, enif_make_atom(req->env, "error"),
|
|
|
|
enif_make_atom(req->env, "shutdown")));
|
|
|
|
req->fn_post(req->args);
|
2013-04-19 13:11:41 +00:00
|
|
|
enif_free_env(req->env);
|
2013-04-14 12:44:54 +00:00
|
|
|
enif_free(req->args);
|
2013-04-15 21:37:14 +00:00
|
|
|
enif_free(req);
|
2013-07-02 01:09:21 +00:00
|
|
|
req = n;
|
|
|
|
}
|
2013-04-17 22:26:59 +00:00
|
|
|
enif_mutex_destroy(q->reqs_mutex);
|
|
|
|
enif_cond_destroy(q->reqs_cnd);
|
2013-03-25 01:00:48 +00:00
|
|
|
}
|
2013-04-25 15:30:11 +00:00
|
|
|
|
2013-06-14 20:57:53 +00:00
|
|
|
/* Free any req structures sitting unused on the recycle queue. */
|
|
|
|
enif_mutex_lock(async_nif->recycled_req_mutex);
|
|
|
|
req = NULL;
|
2013-07-02 01:09:21 +00:00
|
|
|
req = STAILQ_FIRST(&async_nif->recycled_reqs);
|
|
|
|
while(req != NULL) {
|
|
|
|
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
|
2013-06-14 20:57:53 +00:00
|
|
|
enif_free_env(req->env);
|
|
|
|
enif_free(req);
|
2013-07-02 01:09:21 +00:00
|
|
|
req = n;
|
|
|
|
}
|
2013-06-14 20:57:53 +00:00
|
|
|
|
2013-04-25 15:30:11 +00:00
|
|
|
enif_mutex_unlock(async_nif->recycled_req_mutex);
|
|
|
|
enif_mutex_destroy(async_nif->recycled_req_mutex);
|
2013-04-17 22:26:59 +00:00
|
|
|
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
|
2013-04-12 19:25:56 +00:00
|
|
|
enif_free(async_nif);
|
2013-03-25 01:00:48 +00:00
|
|
|
}
|
|
|
|
|
2013-04-05 22:09:54 +00:00
|
|
|
static void *
|
2013-04-18 14:32:29 +00:00
|
|
|
async_nif_load()
|
2013-03-25 01:00:48 +00:00
|
|
|
{
|
2013-04-06 15:05:41 +00:00
|
|
|
static int has_init = 0;
|
2013-06-25 17:31:43 +00:00
|
|
|
unsigned int i, num_queues;
|
2013-04-06 15:05:41 +00:00
|
|
|
ErlNifSysInfo info;
|
2013-04-05 22:09:54 +00:00
|
|
|
struct async_nif_state *async_nif;
|
2013-03-25 01:00:48 +00:00
|
|
|
|
|
|
|
/* Don't init more than once. */
|
2013-04-06 15:05:41 +00:00
|
|
|
if (has_init) return 0;
|
|
|
|
else has_init = 1;
|
2013-03-25 01:00:48 +00:00
|
|
|
|
2013-04-05 22:09:54 +00:00
|
|
|
/* Find out how many schedulers there are. */
|
2013-04-06 15:05:41 +00:00
|
|
|
enif_system_info(&info, sizeof(ErlNifSysInfo));
|
2013-04-05 22:09:54 +00:00
|
|
|
|
2013-04-17 22:26:59 +00:00
|
|
|
/* Size the number of work queues according to schedulers. */
|
|
|
|
if (info.scheduler_threads > ASYNC_NIF_MAX_WORKERS / 2) {
|
|
|
|
num_queues = ASYNC_NIF_MAX_WORKERS / 2;
|
|
|
|
} else {
|
|
|
|
int remainder = ASYNC_NIF_MAX_WORKERS % info.scheduler_threads;
|
|
|
|
if (remainder != 0)
|
|
|
|
num_queues = info.scheduler_threads - remainder;
|
|
|
|
else
|
|
|
|
num_queues = info.scheduler_threads;
|
|
|
|
if (num_queues < 2)
|
|
|
|
num_queues = 2;
|
|
|
|
}
|
2013-07-30 18:21:26 +00:00
|
|
|
num_queues *= 32;
|
2013-04-17 22:26:59 +00:00
|
|
|
|
2013-04-05 22:09:54 +00:00
|
|
|
/* Init our portion of priv_data's module-specific state. */
|
2013-04-17 15:17:13 +00:00
|
|
|
async_nif = enif_alloc(sizeof(struct async_nif_state) +
|
2013-04-17 22:26:59 +00:00
|
|
|
sizeof(struct async_nif_work_queue) * num_queues);
|
2013-04-05 22:09:54 +00:00
|
|
|
if (!async_nif)
|
|
|
|
return NULL;
|
2013-04-17 15:17:13 +00:00
|
|
|
memset(async_nif, 0, sizeof(struct async_nif_state) +
|
2013-06-25 17:31:43 +00:00
|
|
|
sizeof(struct async_nif_work_queue) * num_queues);
|
2013-03-25 01:00:48 +00:00
|
|
|
|
2013-04-17 22:26:59 +00:00
|
|
|
async_nif->num_queues = num_queues;
|
2013-06-25 17:31:43 +00:00
|
|
|
async_nif->we_active = 0;
|
2013-04-14 12:44:54 +00:00
|
|
|
async_nif->next_q = 0;
|
|
|
|
async_nif->shutdown = 0;
|
2013-07-02 01:09:21 +00:00
|
|
|
STAILQ_INIT(&async_nif->recycled_reqs);
|
2013-07-25 17:29:16 +00:00
|
|
|
async_nif->recycled_req_mutex = enif_mutex_create("recycled_req");
|
|
|
|
async_nif->we_mutex = enif_mutex_create("we");
|
2013-06-25 17:31:43 +00:00
|
|
|
SLIST_INIT(&async_nif->we_joining);
|
2013-04-14 12:44:54 +00:00
|
|
|
|
|
|
|
for (i = 0; i < async_nif->num_queues; i++) {
|
|
|
|
struct async_nif_work_queue *q = &async_nif->queues[i];
|
2013-07-02 01:09:21 +00:00
|
|
|
STAILQ_INIT(&q->reqs);
|
2013-07-25 17:29:16 +00:00
|
|
|
q->reqs_mutex = enif_mutex_create("reqs");
|
|
|
|
q->reqs_cnd = enif_cond_create("reqs");
|
|
|
|
q->next = &async_nif->queues[(i + 1) % num_queues];
|
2013-03-25 01:00:48 +00:00
|
|
|
}
|
2013-04-05 22:09:54 +00:00
|
|
|
return async_nif;
|
2013-03-25 01:00:48 +00:00
|
|
|
}
|
|
|
|
|
2013-04-12 19:25:56 +00:00
|
|
|
static void
|
|
|
|
async_nif_upgrade(ErlNifEnv *env)
|
|
|
|
{
|
2013-05-28 20:14:19 +00:00
|
|
|
UNUSED(env);
|
2013-04-12 19:25:56 +00:00
|
|
|
// TODO:
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2013-03-25 01:00:48 +00:00
|
|
|
#if defined(__cplusplus)
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
|
|
|
|
#endif // __ASYNC_NIF_H__
|