bdberl/c_src/bdberl_tpool.c

337 lines
8.8 KiB
C

/* -------------------------------------------------------------------
*
* bdberl: Thread Pool
* Copyright (c) 2008 The Hive. All rights reserved.
*
* ------------------------------------------------------------------- */
#include "bdberl_tpool.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
static void* bdberl_tpool_main(void* tpool);
static TPoolJob* next_job(TPool* tpool);
static int remove_pending_job(TPool* tpool, TPoolJob* job);
static void cleanup_job(TPool* tpool, TPoolJob* job);
static int is_active_job(TPool* tpool, TPoolJob* job);
#define LOCK(t) erl_drv_mutex_lock(tpool->lock)
#define UNLOCK(t) erl_drv_mutex_unlock(tpool->lock)
TPool* bdberl_tpool_start(unsigned int thread_count)
{
TPool* tpool = driver_alloc(sizeof(TPool));
memset(tpool, '\0', sizeof(TPool));
// Initialize lock, cv, etc.
tpool->lock = erl_drv_mutex_create("bdberl_tpool_lock");
tpool->work_cv = erl_drv_cond_create("bdberl_tpool_work_cv");
tpool->cancel_cv = erl_drv_cond_create("bdberl_tpool_cancel_cv");
tpool->threads = driver_alloc(sizeof(ErlDrvTid) * thread_count);
tpool->thread_count = thread_count;
// Startup all the threads
int i;
for (i = 0; i < thread_count; i++)
{
// TODO: Figure out good way to deal with errors in this situation (should be rare, but still...)
erl_drv_thread_create("bdberl_tpool_thread", &(tpool->threads[i]), &bdberl_tpool_main, (void*)tpool, 0);
}
return tpool;
}
void bdberl_tpool_stop(TPool* tpool)
{
LOCK(tpool);
// Set the shutdown flag and broadcast a notification
tpool->shutdown = 1;
erl_drv_cond_broadcast(tpool->work_cv);
// Clean out the queue of pending jobs -- invoke their cleanup function
// Wait for until active_threads hits zero
while (tpool->active_threads > 0)
{
erl_drv_cond_wait(tpool->work_cv, tpool->lock);
}
// Join up with all the workers
int i = 0;
for (i = 0; i < tpool->thread_count; i++)
{
erl_drv_thread_join(tpool->threads[i], 0);
}
// Cleanup
erl_drv_cond_destroy(tpool->work_cv);
erl_drv_cond_destroy(tpool->cancel_cv);
driver_free(tpool->threads);
UNLOCK(tpool);
erl_drv_mutex_destroy(tpool->lock);
driver_free(tpool);
}
void bdberl_tpool_run(TPool* tpool, TPoolJobFunc main_fn, void* arg, TPoolJobFunc cancel_fn,
TPoolJob** job_ptr)
{
// Allocate and fill a new job structure
TPoolJob* job = *job_ptr = driver_alloc(sizeof(TPoolJob));
memset(job, '\0', sizeof(TPoolJob));
job->main_fn = main_fn;
job->arg = arg;
job->cancel_fn = cancel_fn;
// Sync up with the tpool and add the job to the pending queue
LOCK(tpool);
if (tpool->pending_jobs)
{
// Make sure the current last job points to this one next
tpool->last_pending_job->next = job;
}
else
{
// No pending jobs; this is the first
tpool->pending_jobs = job;
}
tpool->last_pending_job = job;
tpool->pending_job_count++;
// Generate a notification that there is work todo.
// TODO: I think this may not be necessary, in the case where there are already other
// pending jobs. Not sure ATM, however, so will be on safe side
erl_drv_cond_broadcast(tpool->work_cv);
UNLOCK(tpool);
}
void bdberl_tpool_cancel(TPool* tpool, TPoolJob* job)
{
LOCK(tpool);
// Remove the job from the pending queue
if (remove_pending_job(tpool, job))
{
// Job was removed from pending -- unlock and notify the job that it got canceled
UNLOCK(tpool);
if (job->cancel_fn)
{
(*(job->cancel_fn))(job->arg);
}
// Delete the job structure
driver_free(job);
return;
}
// Job not in the pending queue -- check the active queue.
if (is_active_job(tpool, job))
{
// Job is currently active -- mark it as cancelled (so we get notified) and wait for it
job->canceled = 1;
while (job->running)
{
erl_drv_cond_wait(tpool->cancel_cv, tpool->lock);
}
// Job is no longer running and should now be considered dead. Cleanup is handled by
// the worker.
UNLOCK(tpool);
return;
}
// Job was neither active nor pending -- it must have gotten run/cleaned up while we
// were waiting on the thread pool lock. Regardless, it's now done/gone and the cancel
// is a success.
UNLOCK(tpool);
}
static void* bdberl_tpool_main(void* arg)
{
TPool* tpool = (TPool*)arg;
LOCK(tpool);
tpool->active_threads++;
while(1)
{
// Check for shutdown...
if (tpool->shutdown)
{
tpool->active_threads--;
erl_drv_cond_broadcast(tpool->work_cv);
UNLOCK(tpool);
return 0;
}
// Get the next job
TPoolJob* job = next_job(tpool);
if (job)
{
// Unlock to avoid blocking others
UNLOCK(tpool);
// Invoke the function
(*(job->main_fn))(job->arg);
// Relock
LOCK(tpool);
// Mark the job as not running (important for cancellation to know it's done)
job->running = 0;
// If the job was cancelled, signal the cancellation cv so that anyone waiting on the
// job knows it's complete
if (job->canceled)
{
erl_drv_cond_broadcast(tpool->cancel_cv);
}
// Cleanup the job (remove from active list, free, etc.)
cleanup_job(tpool, job);
}
else
{
// Wait for a job to come available then jump back to top of loop
erl_drv_cond_wait(tpool->work_cv, tpool->lock);
}
}
return 0;
}
static TPoolJob* next_job(TPool* tpool)
{
if (tpool->pending_jobs)
{
// Pop the job off the queue
TPoolJob* job = tpool->pending_jobs;
tpool->pending_jobs = job->next;
// No more pending jobs; update last job pointer
if (!tpool->pending_jobs)
{
tpool->last_pending_job = 0;
}
// Mark the job as running and add to the active list
job->running = 1;
if (tpool->active_jobs)
{
job->next = tpool->active_jobs;
}
else
{
job->next = NULL;
}
tpool->active_jobs = job;
// Update counters
tpool->pending_job_count--;
tpool->active_job_count++;
return job;
}
return 0;
}
static int remove_pending_job(TPool* tpool, TPoolJob* job)
{
TPoolJob* current = tpool->pending_jobs;
TPoolJob* last = 0;
while (current)
{
if (current == job)
{
// Found our match -- look back and connect the last item to our next. Also,
// make sure that last_pending_job is updated accordingly
if (last)
{
last->next = current->next;
}
else
{
tpool->pending_jobs = current->next;
}
// If this job was the last one, make sure that we update the last_pending_job
// pointer to reference the _previous_ node
if (tpool->last_pending_job == job)
{
tpool->last_pending_job = last;
}
tpool->pending_job_count--;
return 1;
}
// Next...
last = current;
current = current->next;
}
return 0;
}
static void cleanup_job(TPool* tpool, TPoolJob* job)
{
// Loop over active jobs and remove the job from that list
TPoolJob* current = tpool->active_jobs;
TPoolJob* last = 0;
while (current)
{
if (current == job)
{
// Found our match -- look back and connect the last item to our next
if (last)
{
last->next = current->next;
}
else
{
tpool->active_jobs = current->next;
}
break;
}
// Move to next item
last = current;
current = current->next;
}
// Update counter and free the job structure
tpool->active_job_count--;
driver_free(job);
}
static int is_active_job(TPool* tpool, TPoolJob* job)
{
TPoolJob* current = tpool->active_jobs;
while (current)
{
if (current == job)
{
return 1;
}
current = current->next;
}
return 0;
}
// Return the number of pending and active jobs
void bdberl_tpool_job_count(TPool* tpool, unsigned int *pending_count_ptr,
unsigned int *active_count_ptr)
{
LOCK(tpool);
*pending_count_ptr = tpool->pending_job_count;
*active_count_ptr = tpool->active_job_count;
UNLOCK(tpool);
}