diff --git a/c_src/async_nif.h b/c_src/async_nif.h index abe6ed9..36526a9 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -50,6 +50,10 @@ extern "C" { q->s = n + 1; \ return q; \ } \ + static inline void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \ + memset(q, 0, sizeof(struct fifo_q__ ## name) + (q->s * sizeof(type *))); \ + enif_free(q); \ + } \ static inline type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \ q->items[q->h] = n; \ q->h = (q->h + 1) % q->s; \ @@ -61,10 +65,6 @@ extern "C" { q->t = (q->t + 1) % q->s; \ return n; \ } \ - static inline void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \ - memset(q, 0, sizeof(struct fifo_q__ ## name) + (q->s * sizeof(type *))); \ - enif_free(q); \ - } \ static inline unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \ return (q->h - q->t + q->s) % q->s; \ } \ @@ -331,6 +331,7 @@ async_nif_unload(ErlNifEnv *env) unsigned int i; struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env); unsigned int num_queues = async_nif->num_queues; + struct async_nif_work_queue *q = NULL; /* Signal the worker threads, stop what you're doing and exit. To ensure that we don't race with the enqueue() process we first @@ -338,16 +339,15 @@ async_nif_unload(ErlNifEnv *env) unlock. The enqueue function will take the queue mutex, then test for shutdown condition, then enqueue only if not shutting down. */ - for (i = 0; i < num_queues; i++) - enif_mutex_lock(async_nif->queues[i].reqs_mutex); - async_nif->shutdown = 1; - for (i = 0; i < num_queues; i++) - enif_mutex_unlock(async_nif->queues[i].reqs_mutex); - - /* Wake up any waiting worker threads. */ for (i = 0; i < num_queues; i++) { - struct async_nif_work_queue *q = &async_nif->queues[i]; + q = &async_nif->queues[i]; + enif_mutex_lock(q->reqs_mutex); + } + async_nif->shutdown = 1; + for (i = 0; i < num_queues; i++) { + q = &async_nif->queues[i]; enif_cond_broadcast(q->reqs_cnd); + enif_mutex_unlock(q->reqs_mutex); } /* Join for the now exiting worker threads. */ @@ -358,9 +358,8 @@ async_nif_unload(ErlNifEnv *env) /* Cleanup requests, mutexes and conditions in each work queue. */ for (i = 0; i < num_queues; i++) { - struct async_nif_work_queue *q = &async_nif->queues[i]; - enif_mutex_destroy(q->reqs_mutex); - enif_cond_destroy(q->reqs_cnd); + q = &async_nif->queues[i]; + enif_mutex_lock(q->reqs_mutex); // TODO: unnecessary? /* Worker threads are stopped, now toss anything left in the queue. */ struct async_nif_req_entry *req = NULL; @@ -370,13 +369,15 @@ async_nif_unload(ErlNifEnv *env) enif_make_atom(req->env, "shutdown"))); req->fn_post(req->args); enif_free(req->args); - enif_free_env(req->env); enif_free(req); }); fifo_q_free(reqs, q->reqs); + + enif_mutex_unlock(q->reqs_mutex); // TODO: unnecessary? + enif_mutex_destroy(q->reqs_mutex); + enif_cond_destroy(q->reqs_cnd); } - memset(async_nif, 0, sizeof(struct async_nif_state) + - sizeof(struct async_nif_work_queue) * num_queues); + memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues)); enif_free(async_nif); } @@ -384,7 +385,7 @@ static void * async_nif_load(void) { static int has_init = 0; - unsigned int i, j; + unsigned int i, j, num_queues; ErlNifSysInfo info; struct async_nif_state *async_nif; @@ -395,27 +396,28 @@ async_nif_load(void) /* Find out how many schedulers there are. */ enif_system_info(&info, sizeof(ErlNifSysInfo)); + /* Size the number of work queues according to schedulers. */ + if (info.scheduler_threads > ASYNC_NIF_MAX_WORKERS / 2) { + num_queues = ASYNC_NIF_MAX_WORKERS / 2; + } else { + int remainder = ASYNC_NIF_MAX_WORKERS % info.scheduler_threads; + if (remainder != 0) + num_queues = info.scheduler_threads - remainder; + else + num_queues = info.scheduler_threads; + if (num_queues < 2) + num_queues = 2; + } + /* Init our portion of priv_data's module-specific state. */ async_nif = enif_alloc(sizeof(struct async_nif_state) + - sizeof(struct async_nif_work_queue) * info.scheduler_threads); + sizeof(struct async_nif_work_queue) * num_queues); if (!async_nif) return NULL; memset(async_nif, 0, sizeof(struct async_nif_state) + - sizeof(struct async_nif_work_queue) * info.scheduler_threads); + sizeof(struct async_nif_work_queue) * num_queues); - if (info.scheduler_threads > ASYNC_NIF_MAX_WORKERS / 2) { - async_nif->num_queues = ASYNC_NIF_MAX_WORKERS / 2; - } else { - int remainder = ASYNC_NIF_MAX_WORKERS % info.scheduler_threads; - if (remainder != 0) { - async_nif->num_queues = info.scheduler_threads - remainder; - } else { - async_nif->num_queues = info.scheduler_threads; - } - if (async_nif->num_queues < 2) { - async_nif->num_queues = 2; - } - } + async_nif->num_queues = num_queues; async_nif->num_workers = ASYNC_NIF_MAX_WORKERS; // TODO: start with 2 per queue, then grow if needed async_nif->next_q = 0; async_nif->shutdown = 0; diff --git a/c_src/wterl.c b/c_src/wterl.c index c7837ff..992a9e0 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -198,16 +198,8 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char return rc; } - char *key = enif_alloc(sizeof(Uri)); - if (!key) { - session->close(session, NULL); - enif_mutex_unlock(conn_handle->contexts_mutex); - return ENOMEM; - } - memcpy(key, uri, 128); - int itr_status; - itr = kh_put(cursors, h, key, &itr_status); + itr = kh_put(cursors, h, uri, &itr_status); kh_value(h, itr) = *cursor; enif_mutex_unlock(conn_handle->contexts_mutex); }