From 352c16425d7290acabd09fe4e5a6af00913f54bd Mon Sep 17 00:00:00 2001 From: Emily Toop Date: Thu, 15 Mar 2018 15:47:22 +0000 Subject: [PATCH] Command Queue Executor to watch for new commands and execute on longer running background thread --- db/src/tx_observer.rs | 93 ++++++++++++++++++++++++++++++++----------- src/conn.rs | 20 +++++----- 2 files changed, 80 insertions(+), 33 deletions(-) diff --git a/db/src/tx_observer.rs b/db/src/tx_observer.rs index 92429ac0..fe74b537 100644 --- a/db/src/tx_observer.rs +++ b/db/src/tx_observer.rs @@ -8,13 +8,16 @@ // CONDITIONS OF ANY KIND, either express or implied. See the License for the // specific language governing permissions and limitations under the License. -use std::collections::{ - VecDeque, -}; use std::sync::{ Arc, Weak, }; +use std::sync::mpsc::{ + channel, + Receiver, + RecvError, + Sender, +}; use std::thread; use indexmap::{ @@ -56,46 +59,45 @@ pub trait Command { fn execute(&mut self); } -pub struct AsyncTxExecutor { +pub struct TxCommand { reports: Vec, observers: Weak>>, } -impl AsyncTxExecutor { +impl TxCommand { fn new(observers: &Arc>>, reports: Vec) -> Self { - AsyncTxExecutor { + TxCommand { reports, observers: Arc::downgrade(observers), } } } -impl Command for AsyncTxExecutor { - +impl Command for TxCommand { fn execute(&mut self) { - let reports = ::std::mem::replace(&mut self.reports, Vec::new()); - let weak_observers = ::std::mem::replace(&mut self.observers, Default::default()); - thread::spawn (move || { - weak_observers.upgrade().map(|observers| { - for (key, observer) in observers.iter() { - let applicable_reports = observer.applicable_reports(&reports); + self.observers.upgrade().map(|observers| { + for (key, observer) in observers.iter() { + let applicable_reports = observer.applicable_reports(&self.reports); + if !applicable_reports.is_empty() { observer.notify(key.clone(), applicable_reports); } - }) + } }); } } pub struct TxObservationService { observers: Arc>>, - pub command_queue: VecDeque>, + executor: Option>>, + in_progress_count: i32, } impl TxObservationService { pub fn new() -> Self { TxObservationService { observers: Arc::new(IndexMap::new()), - command_queue: VecDeque::new(), + executor: None, + in_progress_count: 0, } } @@ -116,15 +118,58 @@ impl TxObservationService { !self.observers.is_empty() } - pub fn transaction_did_commit(&mut self, reports: Vec) { - self.command_queue.push_back(Box::new(AsyncTxExecutor::new(&self.observers, reports))); + pub fn transaction_did_start(&mut self) { + self.in_progress_count += 1; } - pub fn run(&mut self) { - let mut command = self.command_queue.pop_front(); - while command.is_some() { - command.map(|mut c| c.execute()); - command = self.command_queue.pop_front(); + pub fn transaction_did_commit(&mut self, reports: Vec) { + { + let executor = self.executor.get_or_insert_with(||{ + let (tx, rx): (Sender>, Receiver>) = channel(); + let mut worker = CommandExecutor::new(rx); + + thread::spawn(move || { + worker.main(); + }); + + tx + }); + + let cmd = Box::new(TxCommand::new(&self.observers, reports)); + executor.send(cmd).unwrap(); + } + + self.in_progress_count -= 1; + + if self.in_progress_count == 0 { + self.executor = None; + } + } +} + +struct CommandExecutor { + reciever: Receiver>, +} + +impl CommandExecutor { + fn new(rx: Receiver>) -> Self { + CommandExecutor { + reciever: rx, + } + } + + fn main(&mut self) { + loop { + match self.reciever.recv() { + Err(RecvError) => { + eprintln!("Disconnected, terminating CommandExecutor"); + return + }, + + Ok(mut cmd) => { + cmd.execute() + }, + } } } } diff --git a/src/conn.rs b/src/conn.rs index 1bd666a0..6e58ad0d 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -446,12 +446,6 @@ impl<'a, 'c> InProgress<'a, 'c> { metadata.generation += 1; metadata.partition_map = self.partition_map; - // let the transaction observer know that there have been some transactions committed. - if let Some(ref observer_service) = self.observer_service { - let mut os = observer_service.lock().unwrap(); - os.transaction_did_commit(self.tx_reports); - } - // Update the conn's cache if we made any changes. self.cache.commit_to(&mut metadata.attribute_cache); @@ -463,10 +457,10 @@ impl<'a, 'c> InProgress<'a, 'c> { // TODO: consider making vocabulary lookup lazy -- we won't need it much of the time. } - // run any commands that we've created along the way. + // let the transaction observer know that there have been some transactions committed. if let Some(ref observer_service) = self.observer_service { let mut os = observer_service.lock().unwrap(); - os.run(); + os.transaction_did_commit(self.tx_reports); } Ok(()) @@ -721,6 +715,14 @@ impl Conn { current.attribute_cache.clone()) }; + let mut obs = self.tx_observer_service.lock().unwrap(); + let observer_service = if obs.has_observers() { + obs.transaction_did_start(); + Some(&self.tx_observer_service) + } else { + None + }; + Ok(InProgress { mutex: &self.metadata, transaction: tx, @@ -730,7 +732,7 @@ impl Conn { cache: InProgressSQLiteAttributeCache::from_cache(cache_cow), use_caching: true, tx_reports: Vec::new(), - observer_service: if self.tx_observer_service.lock().unwrap().has_observers() { Some(&self.tx_observer_service) } else { None }, + observer_service: observer_service, }) }