337 lines
8.8 KiB
C
337 lines
8.8 KiB
C
/* -------------------------------------------------------------------
|
|
*
|
|
* bdberl: Thread Pool
|
|
* Copyright (c) 2008 The Hive. All rights reserved.
|
|
*
|
|
* ------------------------------------------------------------------- */
|
|
#include "bdberl_tpool.h"
|
|
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
|
|
static void* bdberl_tpool_main(void* tpool);
|
|
static TPoolJob* next_job(TPool* tpool);
|
|
static int remove_pending_job(TPool* tpool, TPoolJob* job);
|
|
static void cleanup_job(TPool* tpool, TPoolJob* job);
|
|
static int is_active_job(TPool* tpool, TPoolJob* job);
|
|
|
|
#define LOCK(t) erl_drv_mutex_lock(tpool->lock)
|
|
#define UNLOCK(t) erl_drv_mutex_unlock(tpool->lock)
|
|
|
|
TPool* bdberl_tpool_start(unsigned int thread_count)
|
|
{
|
|
TPool* tpool = driver_alloc(sizeof(TPool));
|
|
memset(tpool, '\0', sizeof(TPool));
|
|
|
|
// Initialize lock, cv, etc.
|
|
tpool->lock = erl_drv_mutex_create("bdberl_tpool_lock");
|
|
tpool->work_cv = erl_drv_cond_create("bdberl_tpool_work_cv");
|
|
tpool->cancel_cv = erl_drv_cond_create("bdberl_tpool_cancel_cv");
|
|
tpool->threads = driver_alloc(sizeof(ErlDrvTid) * thread_count);
|
|
tpool->thread_count = thread_count;
|
|
|
|
// Startup all the threads
|
|
int i;
|
|
for (i = 0; i < thread_count; i++)
|
|
{
|
|
// TODO: Figure out good way to deal with errors in this situation (should be rare, but still...)
|
|
erl_drv_thread_create("bdberl_tpool_thread", &(tpool->threads[i]), &bdberl_tpool_main, (void*)tpool, 0);
|
|
}
|
|
|
|
return tpool;
|
|
}
|
|
|
|
void bdberl_tpool_stop(TPool* tpool)
|
|
{
|
|
LOCK(tpool);
|
|
|
|
// Set the shutdown flag and broadcast a notification
|
|
tpool->shutdown = 1;
|
|
erl_drv_cond_broadcast(tpool->work_cv);
|
|
|
|
// Clean out the queue of pending jobs -- invoke their cleanup function
|
|
|
|
// Wait for until active_threads hits zero
|
|
while (tpool->active_threads > 0)
|
|
{
|
|
erl_drv_cond_wait(tpool->work_cv, tpool->lock);
|
|
}
|
|
|
|
// Join up with all the workers
|
|
int i = 0;
|
|
for (i = 0; i < tpool->thread_count; i++)
|
|
{
|
|
erl_drv_thread_join(tpool->threads[i], 0);
|
|
}
|
|
|
|
// Cleanup
|
|
erl_drv_cond_destroy(tpool->work_cv);
|
|
erl_drv_cond_destroy(tpool->cancel_cv);
|
|
driver_free(tpool->threads);
|
|
UNLOCK(tpool);
|
|
erl_drv_mutex_destroy(tpool->lock);
|
|
driver_free(tpool);
|
|
}
|
|
|
|
void bdberl_tpool_run(TPool* tpool, TPoolJobFunc main_fn, void* arg, TPoolJobFunc cancel_fn,
|
|
TPoolJob** job_ptr)
|
|
{
|
|
// Allocate and fill a new job structure
|
|
TPoolJob* job = *job_ptr = driver_alloc(sizeof(TPoolJob));
|
|
memset(job, '\0', sizeof(TPoolJob));
|
|
job->main_fn = main_fn;
|
|
job->arg = arg;
|
|
job->cancel_fn = cancel_fn;
|
|
|
|
// Sync up with the tpool and add the job to the pending queue
|
|
LOCK(tpool);
|
|
|
|
if (tpool->pending_jobs)
|
|
{
|
|
// Make sure the current last job points to this one next
|
|
tpool->last_pending_job->next = job;
|
|
}
|
|
else
|
|
{
|
|
// No pending jobs; this is the first
|
|
tpool->pending_jobs = job;
|
|
}
|
|
|
|
tpool->last_pending_job = job;
|
|
tpool->pending_job_count++;
|
|
|
|
// Generate a notification that there is work todo.
|
|
// TODO: I think this may not be necessary, in the case where there are already other
|
|
// pending jobs. Not sure ATM, however, so will be on safe side
|
|
erl_drv_cond_broadcast(tpool->work_cv);
|
|
UNLOCK(tpool);
|
|
}
|
|
|
|
void bdberl_tpool_cancel(TPool* tpool, TPoolJob* job)
|
|
{
|
|
LOCK(tpool);
|
|
|
|
// Remove the job from the pending queue
|
|
if (remove_pending_job(tpool, job))
|
|
{
|
|
// Job was removed from pending -- unlock and notify the job that it got canceled
|
|
UNLOCK(tpool);
|
|
|
|
if (job->cancel_fn)
|
|
{
|
|
(*(job->cancel_fn))(job->arg);
|
|
}
|
|
|
|
// Delete the job structure
|
|
driver_free(job);
|
|
return;
|
|
}
|
|
|
|
// Job not in the pending queue -- check the active queue.
|
|
if (is_active_job(tpool, job))
|
|
{
|
|
// Job is currently active -- mark it as cancelled (so we get notified) and wait for it
|
|
job->canceled = 1;
|
|
while (job->running)
|
|
{
|
|
erl_drv_cond_wait(tpool->cancel_cv, tpool->lock);
|
|
}
|
|
|
|
// Job is no longer running and should now be considered dead. Cleanup is handled by
|
|
// the worker.
|
|
UNLOCK(tpool);
|
|
return;
|
|
}
|
|
|
|
// Job was neither active nor pending -- it must have gotten run/cleaned up while we
|
|
// were waiting on the thread pool lock. Regardless, it's now done/gone and the cancel
|
|
// is a success.
|
|
UNLOCK(tpool);
|
|
}
|
|
|
|
static void* bdberl_tpool_main(void* arg)
|
|
{
|
|
TPool* tpool = (TPool*)arg;
|
|
|
|
LOCK(tpool);
|
|
|
|
tpool->active_threads++;
|
|
|
|
while(1)
|
|
{
|
|
// Check for shutdown...
|
|
if (tpool->shutdown)
|
|
{
|
|
tpool->active_threads--;
|
|
erl_drv_cond_broadcast(tpool->work_cv);
|
|
UNLOCK(tpool);
|
|
return 0;
|
|
}
|
|
|
|
// Get the next job
|
|
TPoolJob* job = next_job(tpool);
|
|
if (job)
|
|
{
|
|
// Unlock to avoid blocking others
|
|
UNLOCK(tpool);
|
|
|
|
// Invoke the function
|
|
(*(job->main_fn))(job->arg);
|
|
|
|
// Relock
|
|
LOCK(tpool);
|
|
|
|
// Mark the job as not running (important for cancellation to know it's done)
|
|
job->running = 0;
|
|
|
|
// If the job was cancelled, signal the cancellation cv so that anyone waiting on the
|
|
// job knows it's complete
|
|
if (job->canceled)
|
|
{
|
|
erl_drv_cond_broadcast(tpool->cancel_cv);
|
|
}
|
|
|
|
// Cleanup the job (remove from active list, free, etc.)
|
|
cleanup_job(tpool, job);
|
|
}
|
|
else
|
|
{
|
|
// Wait for a job to come available then jump back to top of loop
|
|
erl_drv_cond_wait(tpool->work_cv, tpool->lock);
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static TPoolJob* next_job(TPool* tpool)
|
|
{
|
|
if (tpool->pending_jobs)
|
|
{
|
|
// Pop the job off the queue
|
|
TPoolJob* job = tpool->pending_jobs;
|
|
tpool->pending_jobs = job->next;
|
|
|
|
// No more pending jobs; update last job pointer
|
|
if (!tpool->pending_jobs)
|
|
{
|
|
tpool->last_pending_job = 0;
|
|
}
|
|
|
|
|
|
// Mark the job as running and add to the active list
|
|
job->running = 1;
|
|
if (tpool->active_jobs)
|
|
{
|
|
job->next = tpool->active_jobs;
|
|
}
|
|
else
|
|
{
|
|
job->next = NULL;
|
|
}
|
|
tpool->active_jobs = job;
|
|
|
|
// Update counters
|
|
tpool->pending_job_count--;
|
|
tpool->active_job_count++;
|
|
|
|
return job;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static int remove_pending_job(TPool* tpool, TPoolJob* job)
|
|
{
|
|
TPoolJob* current = tpool->pending_jobs;
|
|
TPoolJob* last = 0;
|
|
while (current)
|
|
{
|
|
if (current == job)
|
|
{
|
|
// Found our match -- look back and connect the last item to our next. Also,
|
|
// make sure that last_pending_job is updated accordingly
|
|
if (last)
|
|
{
|
|
last->next = current->next;
|
|
}
|
|
else
|
|
{
|
|
tpool->pending_jobs = current->next;
|
|
}
|
|
|
|
// If this job was the last one, make sure that we update the last_pending_job
|
|
// pointer to reference the _previous_ node
|
|
if (tpool->last_pending_job == job)
|
|
{
|
|
tpool->last_pending_job = last;
|
|
}
|
|
|
|
tpool->pending_job_count--;
|
|
return 1;
|
|
}
|
|
|
|
// Next...
|
|
last = current;
|
|
current = current->next;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static void cleanup_job(TPool* tpool, TPoolJob* job)
|
|
{
|
|
// Loop over active jobs and remove the job from that list
|
|
TPoolJob* current = tpool->active_jobs;
|
|
TPoolJob* last = 0;
|
|
while (current)
|
|
{
|
|
if (current == job)
|
|
{
|
|
// Found our match -- look back and connect the last item to our next
|
|
if (last)
|
|
{
|
|
last->next = current->next;
|
|
}
|
|
else
|
|
{
|
|
tpool->active_jobs = current->next;
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
// Move to next item
|
|
last = current;
|
|
current = current->next;
|
|
}
|
|
|
|
// Update counter and free the job structure
|
|
tpool->active_job_count--;
|
|
driver_free(job);
|
|
}
|
|
|
|
static int is_active_job(TPool* tpool, TPoolJob* job)
|
|
{
|
|
TPoolJob* current = tpool->active_jobs;
|
|
while (current)
|
|
{
|
|
if (current == job)
|
|
{
|
|
return 1;
|
|
}
|
|
|
|
current = current->next;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
// Return the number of pending and active jobs
|
|
void bdberl_tpool_job_count(TPool* tpool, unsigned int *pending_count_ptr,
|
|
unsigned int *active_count_ptr)
|
|
{
|
|
LOCK(tpool);
|
|
*pending_count_ptr = tpool->pending_job_count;
|
|
*active_count_ptr = tpool->active_job_count;
|
|
UNLOCK(tpool);
|
|
}
|