Cache session/[{cursor, config}] for reuse and spawn threads when needed. #9
1 changed files with 4 additions and 6 deletions
|
@ -252,7 +252,7 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_
|
|||
we = SLIST_FIRST(&async_nif->we_joining);
|
||||
while(we != NULL) {
|
||||
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
|
||||
SLIST_REMOVE_HEAD(&async_nif->we_joining, entries);
|
||||
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
|
||||
void *exit_value = 0; /* We ignore the thread_join's exit value. */
|
||||
enif_thread_join(we->tid, &exit_value);
|
||||
enif_free(we);
|
||||
|
@ -451,11 +451,9 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
|
|||
q = &async_nif->queues[i];
|
||||
enif_mutex_lock(q->reqs_mutex);
|
||||
}
|
||||
|
||||
/* Set the shutdown flag so that worker threads will no continue
|
||||
executing requests. */
|
||||
async_nif->shutdown = 1;
|
||||
|
||||
for (i = 0; i < num_queues; i++) {
|
||||
q = &async_nif->queues[i];
|
||||
enif_mutex_unlock(q->reqs_mutex);
|
||||
|
@ -463,20 +461,20 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
|
|||
|
||||
/* Join for the now exiting worker threads. */
|
||||
while(async_nif->we_active > 0) {
|
||||
|
||||
for (i = 0; i < num_queues; i++)
|
||||
enif_cond_broadcast(async_nif->queues[i].reqs_cnd);
|
||||
|
||||
enif_mutex_lock(async_nif->we_mutex);
|
||||
we = SLIST_FIRST(&async_nif->we_joining);
|
||||
while(we != NULL) {
|
||||
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
|
||||
SLIST_REMOVE_HEAD(&async_nif->we_joining, entries);
|
||||
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
|
||||
void *exit_value = 0; /* We ignore the thread_join's exit value. */
|
||||
enif_thread_join(we->tid, &exit_value);
|
||||
enif_free(we);
|
||||
async_nif->we_active--;
|
||||
we = n;
|
||||
}
|
||||
enif_mutex_unlock(async_nif->we_mutex);
|
||||
}
|
||||
enif_mutex_destroy(async_nif->we_mutex);
|
||||
|
||||
|
|
Loading…
Reference in a new issue