Moved num_queue estimate earlier so as to ensure that the amount of

memory allocated, zero'ed and free'd was consistent.  Skip free'ing
async environment as Erlang will free that for us when we no longer
reference it.  Fix the memory leak (or at least one of them) by no
longer copying the Uri into the hash table.
This commit is contained in:
Gregory Burd 2013-04-17 18:26:59 -04:00
parent 1ae8e5698f
commit db953f5b39
2 changed files with 37 additions and 43 deletions

View file

@ -50,6 +50,10 @@ extern "C" {
q->s = n + 1; \ q->s = n + 1; \
return q; \ return q; \
} \ } \
static inline void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \
memset(q, 0, sizeof(struct fifo_q__ ## name) + (q->s * sizeof(type *))); \
enif_free(q); \
} \
static inline type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \ static inline type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \
q->items[q->h] = n; \ q->items[q->h] = n; \
q->h = (q->h + 1) % q->s; \ q->h = (q->h + 1) % q->s; \
@ -61,10 +65,6 @@ extern "C" {
q->t = (q->t + 1) % q->s; \ q->t = (q->t + 1) % q->s; \
return n; \ return n; \
} \ } \
static inline void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \
memset(q, 0, sizeof(struct fifo_q__ ## name) + (q->s * sizeof(type *))); \
enif_free(q); \
} \
static inline unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \ static inline unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \
return (q->h - q->t + q->s) % q->s; \ return (q->h - q->t + q->s) % q->s; \
} \ } \
@ -331,6 +331,7 @@ async_nif_unload(ErlNifEnv *env)
unsigned int i; unsigned int i;
struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env); struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env);
unsigned int num_queues = async_nif->num_queues; unsigned int num_queues = async_nif->num_queues;
struct async_nif_work_queue *q = NULL;
/* Signal the worker threads, stop what you're doing and exit. To /* Signal the worker threads, stop what you're doing and exit. To
ensure that we don't race with the enqueue() process we first ensure that we don't race with the enqueue() process we first
@ -338,16 +339,15 @@ async_nif_unload(ErlNifEnv *env)
unlock. The enqueue function will take the queue mutex, then unlock. The enqueue function will take the queue mutex, then
test for shutdown condition, then enqueue only if not shutting test for shutdown condition, then enqueue only if not shutting
down. */ down. */
for (i = 0; i < num_queues; i++)
enif_mutex_lock(async_nif->queues[i].reqs_mutex);
async_nif->shutdown = 1;
for (i = 0; i < num_queues; i++)
enif_mutex_unlock(async_nif->queues[i].reqs_mutex);
/* Wake up any waiting worker threads. */
for (i = 0; i < num_queues; i++) { for (i = 0; i < num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i]; q = &async_nif->queues[i];
enif_mutex_lock(q->reqs_mutex);
}
async_nif->shutdown = 1;
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
enif_cond_broadcast(q->reqs_cnd); enif_cond_broadcast(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex);
} }
/* Join for the now exiting worker threads. */ /* Join for the now exiting worker threads. */
@ -358,9 +358,8 @@ async_nif_unload(ErlNifEnv *env)
/* Cleanup requests, mutexes and conditions in each work queue. */ /* Cleanup requests, mutexes and conditions in each work queue. */
for (i = 0; i < num_queues; i++) { for (i = 0; i < num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i]; q = &async_nif->queues[i];
enif_mutex_destroy(q->reqs_mutex); enif_mutex_lock(q->reqs_mutex); // TODO: unnecessary?
enif_cond_destroy(q->reqs_cnd);
/* Worker threads are stopped, now toss anything left in the queue. */ /* Worker threads are stopped, now toss anything left in the queue. */
struct async_nif_req_entry *req = NULL; struct async_nif_req_entry *req = NULL;
@ -370,13 +369,15 @@ async_nif_unload(ErlNifEnv *env)
enif_make_atom(req->env, "shutdown"))); enif_make_atom(req->env, "shutdown")));
req->fn_post(req->args); req->fn_post(req->args);
enif_free(req->args); enif_free(req->args);
enif_free_env(req->env);
enif_free(req); enif_free(req);
}); });
fifo_q_free(reqs, q->reqs); fifo_q_free(reqs, q->reqs);
enif_mutex_unlock(q->reqs_mutex); // TODO: unnecessary?
enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd);
} }
memset(async_nif, 0, sizeof(struct async_nif_state) + memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
sizeof(struct async_nif_work_queue) * num_queues);
enif_free(async_nif); enif_free(async_nif);
} }
@ -384,7 +385,7 @@ static void *
async_nif_load(void) async_nif_load(void)
{ {
static int has_init = 0; static int has_init = 0;
unsigned int i, j; unsigned int i, j, num_queues;
ErlNifSysInfo info; ErlNifSysInfo info;
struct async_nif_state *async_nif; struct async_nif_state *async_nif;
@ -395,27 +396,28 @@ async_nif_load(void)
/* Find out how many schedulers there are. */ /* Find out how many schedulers there are. */
enif_system_info(&info, sizeof(ErlNifSysInfo)); enif_system_info(&info, sizeof(ErlNifSysInfo));
/* Size the number of work queues according to schedulers. */
if (info.scheduler_threads > ASYNC_NIF_MAX_WORKERS / 2) {
num_queues = ASYNC_NIF_MAX_WORKERS / 2;
} else {
int remainder = ASYNC_NIF_MAX_WORKERS % info.scheduler_threads;
if (remainder != 0)
num_queues = info.scheduler_threads - remainder;
else
num_queues = info.scheduler_threads;
if (num_queues < 2)
num_queues = 2;
}
/* Init our portion of priv_data's module-specific state. */ /* Init our portion of priv_data's module-specific state. */
async_nif = enif_alloc(sizeof(struct async_nif_state) + async_nif = enif_alloc(sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * info.scheduler_threads); sizeof(struct async_nif_work_queue) * num_queues);
if (!async_nif) if (!async_nif)
return NULL; return NULL;
memset(async_nif, 0, sizeof(struct async_nif_state) + memset(async_nif, 0, sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * info.scheduler_threads); sizeof(struct async_nif_work_queue) * num_queues);
if (info.scheduler_threads > ASYNC_NIF_MAX_WORKERS / 2) { async_nif->num_queues = num_queues;
async_nif->num_queues = ASYNC_NIF_MAX_WORKERS / 2;
} else {
int remainder = ASYNC_NIF_MAX_WORKERS % info.scheduler_threads;
if (remainder != 0) {
async_nif->num_queues = info.scheduler_threads - remainder;
} else {
async_nif->num_queues = info.scheduler_threads;
}
if (async_nif->num_queues < 2) {
async_nif->num_queues = 2;
}
}
async_nif->num_workers = ASYNC_NIF_MAX_WORKERS; // TODO: start with 2 per queue, then grow if needed async_nif->num_workers = ASYNC_NIF_MAX_WORKERS; // TODO: start with 2 per queue, then grow if needed
async_nif->next_q = 0; async_nif->next_q = 0;
async_nif->shutdown = 0; async_nif->shutdown = 0;

View file

@ -198,16 +198,8 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char
return rc; return rc;
} }
char *key = enif_alloc(sizeof(Uri));
if (!key) {
session->close(session, NULL);
enif_mutex_unlock(conn_handle->contexts_mutex);
return ENOMEM;
}
memcpy(key, uri, 128);
int itr_status; int itr_status;
itr = kh_put(cursors, h, key, &itr_status); itr = kh_put(cursors, h, uri, &itr_status);
kh_value(h, itr) = *cursor; kh_value(h, itr) = *cursor;
enif_mutex_unlock(conn_handle->contexts_mutex); enif_mutex_unlock(conn_handle->contexts_mutex);
} }