1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use builder::Builder;
use pool::Pool;
use sender::Sender;
use shutdown::Shutdown;

use futures::Future;

/// Work-stealing based thread pool for executing futures.
///
/// If a `ThreadPool` instance is dropped without explicitly being shutdown,
/// `shutdown_now` is called implicitly, forcing all tasks that have not yet
/// completed to be dropped.
///
/// Create `ThreadPool` instances using `Builder`.
#[derive(Debug)]
pub struct ThreadPool {
    pub(crate) inner: Option<Sender>,
}

impl ThreadPool {
    /// Create a new `ThreadPool` with default values.
    ///
    /// Use [`Builder`] for creating a configured thread pool.
    ///
    /// [`Builder`]: struct.Builder.html
    pub fn new() -> ThreadPool {
        Builder::new().build()
    }

    /// Spawn a future onto the thread pool.
    ///
    /// This function takes ownership of the future and randomly assigns it to a
    /// worker thread. The thread will then start executing the future.
    ///
    /// # Examples
    ///
    /// ```rust
    /// # extern crate tokio_threadpool;
    /// # extern crate futures;
    /// # use tokio_threadpool::ThreadPool;
    /// use futures::future::{Future, lazy};
    ///
    /// # pub fn main() {
    /// // Create a thread pool with default configuration values
    /// let thread_pool = ThreadPool::new();
    ///
    /// thread_pool.spawn(lazy(|| {
    ///     println!("called from a worker thread");
    ///     Ok(())
    /// }));
    ///
    /// // Gracefully shutdown the threadpool
    /// thread_pool.shutdown().wait().unwrap();
    /// # }
    /// ```
    ///
    /// # Panics
    ///
    /// This function panics if the spawn fails. Use [`Sender::spawn`] for a
    /// version that returns a `Result` instead of panicking.
    pub fn spawn<F>(&self, future: F)
    where F: Future<Item = (), Error = ()> + Send + 'static,
    {
        self.sender().spawn(future).unwrap();
    }

    /// Return a reference to the sender handle
    ///
    /// The handle is used to spawn futures onto the thread pool. It also
    /// implements the `Executor` trait.
    pub fn sender(&self) -> &Sender {
        self.inner.as_ref().unwrap()
    }

    /// Return a mutable reference to the sender handle
    pub fn sender_mut(&mut self) -> &mut Sender {
        self.inner.as_mut().unwrap()
    }

    /// Shutdown the pool once it becomes idle.
    ///
    /// Idle is defined as the completion of all futures that have been spawned
    /// onto the thread pool. There may still be outstanding handles when the
    /// thread pool reaches an idle state.
    ///
    /// Once the idle state is reached, calling `spawn` on any outstanding
    /// handle will result in an error. All worker threads are signaled and will
    /// shutdown. The returned future completes once all worker threads have
    /// completed the shutdown process.
    pub fn shutdown_on_idle(mut self) -> Shutdown {
        self.inner().shutdown(false, false);
        Shutdown { inner: self.inner.take().unwrap() }
    }

    /// Shutdown the pool
    ///
    /// This prevents the thread pool from accepting new tasks but will allow
    /// any existing tasks to complete.
    ///
    /// Calling `spawn` on any outstanding handle will result in an error. All
    /// worker threads are signaled and will shutdown. The returned future
    /// completes once all worker threads have completed the shutdown process.
    pub fn shutdown(mut self) -> Shutdown {
        self.inner().shutdown(true, false);
        Shutdown { inner: self.inner.take().unwrap() }
    }

    /// Shutdown the pool immediately
    ///
    /// This will prevent the thread pool from accepting new tasks **and**
    /// abort any tasks that are currently running on the thread pool.
    ///
    /// Calling `spawn` on any outstanding handle will result in an error. All
    /// worker threads are signaled and will shutdown. The returned future
    /// completes once all worker threads have completed the shutdown process.
    pub fn shutdown_now(mut self) -> Shutdown {
        self.inner().shutdown(true, true);
        Shutdown { inner: self.inner.take().unwrap() }
    }

    fn inner(&self) -> &Pool {
        &*self.inner.as_ref().unwrap().inner
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        if let Some(sender) = self.inner.take() {
            sender.inner.shutdown(true, true);
            let shutdown = Shutdown { inner: sender };
            let _ = shutdown.wait();
        }
    }
}