From db953f5b39b896fd404f16fc49848045fb1d8c11 Mon Sep 17 00:00:00 2001 From: Gregory Burd Date: Wed, 17 Apr 2013 18:26:59 -0400 Subject: [PATCH] Moved num_queue estimate earlier so as to ensure that the amount of memory allocated, zero'ed and free'd was consistent. Skip free'ing async environment as Erlang will free that for us when we no longer reference it. Fix the memory leak (or at least one of them) by no longer copying the Uri into the hash table. --- c_src/async_nif.h | 70 ++++++++++++++++++++++++----------------------- c_src/wterl.c | 10 +------ 2 files changed, 37 insertions(+), 43 deletions(-) diff --git a/c_src/async_nif.h b/c_src/async_nif.h index abe6ed9..36526a9 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -50,6 +50,10 @@ extern "C" { q->s = n + 1; \ return q; \ } \ + static inline void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \ + memset(q, 0, sizeof(struct fifo_q__ ## name) + (q->s * sizeof(type *))); \ + enif_free(q); \ + } \ static inline type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \ q->items[q->h] = n; \ q->h = (q->h + 1) % q->s; \ @@ -61,10 +65,6 @@ extern "C" { q->t = (q->t + 1) % q->s; \ return n; \ } \ - static inline void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \ - memset(q, 0, sizeof(struct fifo_q__ ## name) + (q->s * sizeof(type *))); \ - enif_free(q); \ - } \ static inline unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \ return (q->h - q->t + q->s) % q->s; \ } \ @@ -331,6 +331,7 @@ async_nif_unload(ErlNifEnv *env) unsigned int i; struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env); unsigned int num_queues = async_nif->num_queues; + struct async_nif_work_queue *q = NULL; /* Signal the worker threads, stop what you're doing and exit. To ensure that we don't race with the enqueue() process we first @@ -338,16 +339,15 @@ async_nif_unload(ErlNifEnv *env) unlock. The enqueue function will take the queue mutex, then test for shutdown condition, then enqueue only if not shutting down. */ - for (i = 0; i < num_queues; i++) - enif_mutex_lock(async_nif->queues[i].reqs_mutex); - async_nif->shutdown = 1; - for (i = 0; i < num_queues; i++) - enif_mutex_unlock(async_nif->queues[i].reqs_mutex); - - /* Wake up any waiting worker threads. */ for (i = 0; i < num_queues; i++) { - struct async_nif_work_queue *q = &async_nif->queues[i]; + q = &async_nif->queues[i]; + enif_mutex_lock(q->reqs_mutex); + } + async_nif->shutdown = 1; + for (i = 0; i < num_queues; i++) { + q = &async_nif->queues[i]; enif_cond_broadcast(q->reqs_cnd); + enif_mutex_unlock(q->reqs_mutex); } /* Join for the now exiting worker threads. */ @@ -358,9 +358,8 @@ async_nif_unload(ErlNifEnv *env) /* Cleanup requests, mutexes and conditions in each work queue. */ for (i = 0; i < num_queues; i++) { - struct async_nif_work_queue *q = &async_nif->queues[i]; - enif_mutex_destroy(q->reqs_mutex); - enif_cond_destroy(q->reqs_cnd); + q = &async_nif->queues[i]; + enif_mutex_lock(q->reqs_mutex); // TODO: unnecessary? /* Worker threads are stopped, now toss anything left in the queue. */ struct async_nif_req_entry *req = NULL; @@ -370,13 +369,15 @@ async_nif_unload(ErlNifEnv *env) enif_make_atom(req->env, "shutdown"))); req->fn_post(req->args); enif_free(req->args); - enif_free_env(req->env); enif_free(req); }); fifo_q_free(reqs, q->reqs); + + enif_mutex_unlock(q->reqs_mutex); // TODO: unnecessary? + enif_mutex_destroy(q->reqs_mutex); + enif_cond_destroy(q->reqs_cnd); } - memset(async_nif, 0, sizeof(struct async_nif_state) + - sizeof(struct async_nif_work_queue) * num_queues); + memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues)); enif_free(async_nif); } @@ -384,7 +385,7 @@ static void * async_nif_load(void) { static int has_init = 0; - unsigned int i, j; + unsigned int i, j, num_queues; ErlNifSysInfo info; struct async_nif_state *async_nif; @@ -395,27 +396,28 @@ async_nif_load(void) /* Find out how many schedulers there are. */ enif_system_info(&info, sizeof(ErlNifSysInfo)); + /* Size the number of work queues according to schedulers. */ + if (info.scheduler_threads > ASYNC_NIF_MAX_WORKERS / 2) { + num_queues = ASYNC_NIF_MAX_WORKERS / 2; + } else { + int remainder = ASYNC_NIF_MAX_WORKERS % info.scheduler_threads; + if (remainder != 0) + num_queues = info.scheduler_threads - remainder; + else + num_queues = info.scheduler_threads; + if (num_queues < 2) + num_queues = 2; + } + /* Init our portion of priv_data's module-specific state. */ async_nif = enif_alloc(sizeof(struct async_nif_state) + - sizeof(struct async_nif_work_queue) * info.scheduler_threads); + sizeof(struct async_nif_work_queue) * num_queues); if (!async_nif) return NULL; memset(async_nif, 0, sizeof(struct async_nif_state) + - sizeof(struct async_nif_work_queue) * info.scheduler_threads); + sizeof(struct async_nif_work_queue) * num_queues); - if (info.scheduler_threads > ASYNC_NIF_MAX_WORKERS / 2) { - async_nif->num_queues = ASYNC_NIF_MAX_WORKERS / 2; - } else { - int remainder = ASYNC_NIF_MAX_WORKERS % info.scheduler_threads; - if (remainder != 0) { - async_nif->num_queues = info.scheduler_threads - remainder; - } else { - async_nif->num_queues = info.scheduler_threads; - } - if (async_nif->num_queues < 2) { - async_nif->num_queues = 2; - } - } + async_nif->num_queues = num_queues; async_nif->num_workers = ASYNC_NIF_MAX_WORKERS; // TODO: start with 2 per queue, then grow if needed async_nif->next_q = 0; async_nif->shutdown = 0; diff --git a/c_src/wterl.c b/c_src/wterl.c index c7837ff..992a9e0 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -198,16 +198,8 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char return rc; } - char *key = enif_alloc(sizeof(Uri)); - if (!key) { - session->close(session, NULL); - enif_mutex_unlock(conn_handle->contexts_mutex); - return ENOMEM; - } - memcpy(key, uri, 128); - int itr_status; - itr = kh_put(cursors, h, key, &itr_status); + itr = kh_put(cursors, h, uri, &itr_status); kh_value(h, itr) = *cursor; enif_mutex_unlock(conn_handle->contexts_mutex); }