A bit of cleanup

This commit is contained in:
Gregory Burd 2013-04-14 19:33:14 -04:00
parent 15a9a70c41
commit 9ed2137730
2 changed files with 19 additions and 7 deletions

View file

@ -28,6 +28,10 @@ extern "C" {
#include "queue.h"
#ifdef ASYNC_NIF_STATS
#include "stats.h"
#endif
#define ASYNC_NIF_MAX_WORKERS 128
struct async_nif_req_entry {
@ -43,6 +47,10 @@ struct async_nif_req_entry {
struct async_nif_work_queue {
ErlNifMutex *reqs_mutex;
ErlNifCond *reqs_cnd;
unsigned int depth;
#ifdef ASYNC_NIF_STATS
struct async_stats stats;
#endif
STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
};
@ -54,7 +62,6 @@ struct async_nif_worker_entry {
};
struct async_nif_state {
unsigned int req_count;
unsigned int shutdown;
unsigned int num_workers;
struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS];
@ -96,7 +103,7 @@ struct async_nif_state {
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \
} \
memset(req, 0, sizeof(struct async_nif_req_entry)); \
memset(req, 0, sizeof(struct async_nif_req_entry)); \
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \
if (!copy_of_args) { \
fn_post_ ## decl (args); \
@ -163,19 +170,20 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
threads. */
unsigned int qid = async_nif->next_q; // Keep a local to avoid the race.
struct async_nif_work_queue *q = &async_nif->queues[qid];
if (q->depth > 10)
async_nif->next_q = (qid + 1) % async_nif->num_queues;
/* Otherwise, add the request to the work queue. */
enif_mutex_lock(q->reqs_mutex);
STAILQ_INSERT_TAIL(&q->reqs, req, entries);
async_nif->req_count++;
q->depth++;
//fprintf(stderr, "enqueued %d (%d)\r\n", qid, async_nif->req_count); fflush(stderr);
/* Build the term before releasing the lock so as not to race on the use of
the req pointer. */
ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"),
enif_make_tuple2(req->env, enif_make_atom(req->env, "enqueued"),
enif_make_int(req->env, async_nif->req_count)));
enif_make_int(req->env, q->depth)));
enif_mutex_unlock(q->reqs_mutex);
enif_cond_signal(q->reqs_cnd);
return reply;
@ -210,7 +218,7 @@ async_nif_worker_fn(void *arg)
/* Take the request off the queue. */
//fprintf(stderr, "worker %d queue %d performing req (%d)\r\n", worker_id, (worker_id % async_nif->num_queues), async_nif->req_count); fflush(stderr);
STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries);
async_nif->req_count--;
q->depth--;
enif_mutex_unlock(q->reqs_mutex);
/* Finally, do the work. */
@ -275,7 +283,6 @@ async_nif_unload(ErlNifEnv *env)
req->fn_post(req->args);
enif_free(req->args);
enif_free(req);
async_nif->req_count--;
}
}
memset(async_nif, 0, sizeof(struct async_nif_state));
@ -305,12 +312,12 @@ async_nif_load(void)
async_nif->num_queues = info.scheduler_threads;
async_nif->next_q = 0;
async_nif->req_count = 0;
async_nif->shutdown = 0;
for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i];
STAILQ_INIT(&q->reqs);
q->depth = 0;
q->reqs_mutex = enif_mutex_create(NULL);
q->reqs_cnd = enif_cond_create(NULL);
}
@ -319,6 +326,7 @@ async_nif_load(void)
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
/* Start the worker threads. */
// TODO:
//unsigned int num_workers = ASYNC_NIF_MAX_WORKERS - (ASYNC_NIF_MAX_WORKERS % async_nif->num_queues);
unsigned int num_workers = async_nif->num_queues;
//unsigned int allocation = 1;

View file

@ -27,6 +27,10 @@
#include "async_nif.h"
#include "khash.h"
#ifdef WTERL_STATS
#include "stats.h"
#endif
#ifdef DEBUG
#include <stdio.h>
#include <stdarg.h>